Apache Storm - Trident

  • 简述

    Trident 是 Storm 的扩展。与 Storm 一样,Trident 也是由 Twitter 开发的。开发 Trident 的主要原因是在 Storm 之上提供高级抽象以及状态流处理和低延迟分布式查询。
    Trident 使用 spout 和 bolt,但这些低级组件在执行前由 Trident 自动生成。Trident 具有函数、过滤器、连接、分组和聚合。
    Trident 将流作为一系列批次处理,这些批次称为事务。通常,这些小批量的大小将在数千或数百万个元组的数量级上,具体取决于输入流。这样一来,Trident 就与 Storm 不同,Storm 执行的是逐个元组的处理。
    批处理概念与数据库事务非常相似。每个事务都分配有一个事务 ID。一旦所有处理完成,该事务就被认为是成功的。但是,处理事务元组之一的失败将导致整个事务被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,并在事务结束时提交。
  • Trident topology

    Trident API 公开了一个使用“TridentTopology”类创建 Trident topology的简单选项。基本上,Trident topology从 spout 接收输入流,并对流进行有序的操作(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolts 被操作替换。可以创建一个简单的Trident topology如下 -
    
    TridentTopology topology = new TridentTopology();
    
  • Trident 元组

    Trident 元组是一个命名的值列表。TridentTuple 接口是 Trident topology的数据模型。TridentTuple 接口是 Trident topology可以处理的基本数据单元。
  • Trident spout

    Trident spout 与 Storm spout 类似,具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm topology中使用的 IRichSpout,但它本质上是非事务性的,我们将无法使用 Trident 提供的优势。
    具有使用 Trident 特性的所有功能的基本 spout 是“ITridentSpout”。它支持事务和不透明的事务语义。其他 spout 是 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。
    除了这些通用的 spout 之外,Trident 还有许多 trident spout 的示例实现。其中之一是 FeederBatchSpout spout,我们可以使用它轻松发送Trident 元组的命名列表,而无需担心批处理、并行性等。
    FeederBatchSpout 创建和数据馈送可以如下所示完成 -
    
    TridentTopology topology = new TridentTopology();
    FeederBatchSpout testSpout = new FeederBatchSpout(
       ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
    topology.newStream("fixed-batch-spout", testSpout)
    testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
    
  • Trident 行动

    Trident 依靠“Trident Operation”来处理 trident 元组的输入流。Trident API 有许多内置操作来处理从简单到复杂的流处理。这些操作的范围从简单的验证到Trident 元组的复杂分组和聚合。让我们来看看最重要和最常用的操作。

    筛选

    过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取 trident 元组字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回 true,则元组保存在输出流中;否则,从流中删除元组。过滤器基本上会继承自BaseFilter类并实现isKeep方法。这是过滤器操作的示例实现 -
    
    public class MyFilter extends BaseFilter {
       public boolean isKeep(TridentTuple tuple) {
          return tuple.getInteger(1) % 2 == 0;
       }
    }
    input
    [1, 2]
    [1, 3]
    [1, 4]
    output
    [1, 2]
    [1, 4]
    
    可以使用“each”方法在topology中调用过滤器函数。“字段”类可用于指定输入(Trident 元组的子集)。示例代码如下 -
    
    TridentTopology topology = new TridentTopology();
    topology.newStream("spout", spout)
    .each(new Fields("a", "b"), new MyFilter())
    

    功能

    Function是用于对单个Trident 元组执行简单操作的对象。它需要一个Trident 元组字段的子集,并发出零个或多个新的Trident 元组字段。
    Function基本上继承自BaseFunction类并实现execute方法。下面给出了一个示例实现 -
    
    public class MyFunction extends BaseFunction {
       public void execute(TridentTuple tuple, TridentCollector collector) {
          int a = tuple.getInteger(0);
          int b = tuple.getInteger(1);
          collector.emit(new Values(a + b));
       }
    }
    input
    [1, 2]
    [1, 3]
    [1, 4]
    output
    [1, 2, 3]
    [1, 3, 4]
    [1, 4, 5]
    
    就像过滤器操作一样,函数操作可以在topology中使用each方法。示例代码如下 -
    
    TridentTopology topology = new TridentTopology();
    topology.newStream("spout", spout)
       .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
    

    聚合

    聚合是用于对输入批处理或分区或流执行聚合操作的对象。Trident 具有三种类型的聚合。它们如下 -
    • aggregate− 单独聚合每批Trident 元组。在聚合过程中,元组最初使用全局分组重新分区,以将同一批次的所有分区组合成一个分区。
    • partitionAggregate− 聚合每个分区而不是整批Trident 元组。分区聚合的输出完全替代了输入元组。分区聚合的输出包含单个字段元组。
    • persistentaggregate− 聚合所有批次的所有Trident 元组,并将结果存储在内存或数据库中。
    
    TridentTopology topology = new TridentTopology();
    // aggregate operation
    topology.newStream("spout", spout)
       .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
       .aggregate(new Count(), new Fields(“count”))
        
    // partitionAggregate operation
    topology.newStream("spout", spout)
       .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
       .partitionAggregate(new Count(), new Fields(“count"))
        
    // persistentAggregate - saving the count to memory
    topology.newStream("spout", spout)
       .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
    
    可以使用CombinerAggregator、ReducerAggregator 或通用Aggregator 接口创建聚合操作。上面示例中使用的“count”聚合器是内置聚合器之一。它使用“CombinerAggregator”实现。实现如下 -
    
    public class Count implements CombinerAggregator<Long> {
       @Override
       public Long init(TridentTuple tuple) {
          return 1L;
       }
        
       @Override
       public Long combine(Long val1, Long val2) {
          return val1 + val2;
       }
        
       @Override
       public Long zero() {
          return 0L;
       }
    }
    

    分组

    分组操作是一种内置操作,可以由groupBy方法。groupBy 方法通过对指定字段执行 partitionBy 对流进行重新分区,然后在每个分区中,它将组字段相等的元组组合在一起。通常,我们使用“groupBy”和“persistentAggregate”来获得分组聚合。示例代码如下 -
    
    TridentTopology topology = new TridentTopology();
    // persistentAggregate - saving the count to memory
    topology.newStream("spout", spout)
       .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
       .groupBy(new Fields(“d”)
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
    

    合并和加入

    合并和加入可以分别使用“合并”和“加入”方法来完成。合并合并一个或多个流。加入类似于合并,只是加入使用双方的Trident 元组字段来检查和加入两个流。此外,加入将仅在批处理级别下工作。示例代码如下 -
    
    TridentTopology topology = new TridentTopology();
    topology.merge(stream1, stream2, stream3);
    topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
       new Fields("key", "a", "b", "c"));
    
  • 状态维护

    Trident 提供了一种状态维护机制。状态信息可以存储在topology本身中,否则您也可以将其存储在单独的数据库中。原因是保持一个状态,即如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定此元组的状态是否先前已更新。如果元组在更新状态之前已经失败,那么重试元组将使状态稳定。但是,如果更新状态后元组失败了,那么重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保仅处理一次消息 -
    • 小批量处理元组。
    • 为每个批次分配一个唯一的 ID。如果该批次被重试,它会被赋予相同的唯一 ID。
    • 状态更新按批次排序。例如,在第一批的状态更新完成之前,第二批的状态更新是不可能的。
  • 分布式 RPC

    分布式 RPC 用于从 Trident topology中查询和检索结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求并将其传递给topology。topology处理请求并将结果发送到分布式 RPC 服务器,分布式 RPC 服务器将其重定向到客户端。Trident 的分布式 RPC 查询执行起来就像一个普通的 RPC 查询,除了这些查询是并行运行的。
  • 何时使用Trident ?

    在许多用例中,如果要求只处理一次查询,我们可以通过在 Trident 中编写topology来实现。另一方面,在 Storm 的情况下,很难实现完全一次处理。因此,Trident 对于那些只需要一次处理的用例很有用。Trident 并不适用于所有用例,尤其是高性能用例,因为它增加了 Storm 的复杂性并管理状态。
  • Trident 的工作示例

    我们将把上一节中开发的呼叫日志分析器应用程序转换为 Trident 框架。与普通的storm相比,Trident应用程序相对容易,这要归功于它的高级API。Storm 基本上需要在 Trident 中执行 Function、Filter、Aggregate、GroupBy、Join 和 Merge 操作中的任何一项。最后,我们将使用LocalDRPC类并使用executeLocalDRPC 类的方法。

    格式化通话信息

    FormatCall 类的目的是格式化包含“Caller number”和“Receiver number”的呼叫信息。完整的程序代码如下 -

    编码:FormatCall.java

    
    import backtype.storm.tuple.Values;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.tuple.TridentTuple;
    public class FormatCall extends BaseFunction {
       @Override
       public void execute(TridentTuple tuple, TridentCollector collector) {
          String fromMobileNumber = tuple.getString(0);
          String toMobileNumber = tuple.getString(1);
          collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
       }
    }
    

    CSVSplit

    CSVSplit 类的目的是根据“逗号 (,)”拆分输入字符串,并发出字符串中的每个单词。该函数用于解析分布式查询的输入参数。完整的代码如下 -

    编码:CSVSplit.java

    
    import backtype.storm.tuple.Values;
    import storm.trident.operation.BaseFunction;
    import storm.trident.operation.TridentCollector;
    import storm.trident.tuple.TridentTuple;
    public class CSVSplit extends BaseFunction {
       @Override
       public void execute(TridentTuple tuple, TridentCollector collector) {
          for(String word: tuple.getString(0).split(",")) {
             if(word.length() > 0) {
                collector.emit(new Values(word));
             }
          }
       }
    }
    

    日志分析器

    这是主要的应用程序。最初,应用程序将初始化 TridentTopology 并使用FeederBatchSpout. Trident topology流可以使用newStreamTridentTopology 类的方法。同样,可以使用 Trident topology DRPC 流创建newDRCPStreamTridentTopology 类的方法。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。LocalDRPC有执行方法来搜索一些关键字。完整的代码如下。

    编码:LogAnalyserTrident.java

    
    import java.util.*;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.utils.DRPCClient;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    import storm.trident.TridentState;
    import storm.trident.TridentTopology;
    import storm.trident.tuple.TridentTuple;
    import storm.trident.operation.builtin.FilterNull;
    import storm.trident.operation.builtin.Count;
    import storm.trident.operation.builtin.Sum;
    import storm.trident.operation.builtin.MapGet;
    import storm.trident.operation.builtin.Debug;
    import storm.trident.operation.BaseFilter;
    import storm.trident.testing.FixedBatchSpout;
    import storm.trident.testing.FeederBatchSpout;
    import storm.trident.testing.Split;
    import storm.trident.testing.MemoryMapState;
    import com.google.common.collect.ImmutableList;
    public class LogAnalyserTrident {
       public static void main(String[] args) throws Exception {
          System.out.println("Log Analyser Trident");
          TridentTopology topology = new TridentTopology();
            
          FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
             "toMobileNumber", "duration"));
          TridentState callCounts = topology
             .newStream("fixed-batch-spout", testSpout)
             .each(new Fields("fromMobileNumber", "toMobileNumber"), 
             new FormatCall(), new Fields("call"))
             .groupBy(new Fields("call"))
             .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
             new Fields("count"));
          LocalDRPC drpc = new LocalDRPC();
          topology.newDRPCStream("call_count", drpc)
             .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
          topology.newDRPCStream("multiple_call_count", drpc)
             .each(new Fields("args"), new CSVSplit(), new Fields("call"))
             .groupBy(new Fields("call"))
             .stateQuery(callCounts, new Fields("call"), new MapGet(), 
             new Fields("count"))
             .each(new Fields("call", "count"), new Debug())
             .each(new Fields("count"), new FilterNull())
             .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
          Config conf = new Config();
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("trident", conf, topology.build());
          Random randomGenerator = new Random();
          int idx = 0;
            
          while(idx < 10) {
             testSpout.feed(ImmutableList.of(new Values("1234123401", 
                "1234123402", randomGenerator.nextInt(60))));
             testSpout.feed(ImmutableList.of(new Values("1234123401", 
                "1234123403", randomGenerator.nextInt(60))));
             testSpout.feed(ImmutableList.of(new Values("1234123401", 
                "1234123404", randomGenerator.nextInt(60))));
             testSpout.feed(ImmutableList.of(new Values("1234123402", 
                "1234123403", randomGenerator.nextInt(60))));
             idx = idx + 1;
          }
          System.out.println("DRPC : Query starts");
          System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
          System.out.println(drpc.execute("multiple_call_count", "1234123401 -
             1234123402,1234123401 - 1234123403"));
          System.out.println("DRPC : Query ends");
          cluster.shutdown();
          drpc.shutdown();
          // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
       }
    }
    
  • 构建和运行应用程序

    完整的应用程序包含三个 Java 代码。它们如下 -
    • FormatCall.java
    • CSVSplit.java
    • LogAnalyerTrident.java
    可以使用以下命令构建应用程序 -
    
    javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
    
    该应用程序可以使用以下命令运行 -
    
    java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
    

    输出

    一旦应用程序启动,应用程序将输出关于集群启动过程、操作处理、DRPC Server和客户端信息,最后是集群关闭过程的完整细节。此输出将显示在控制台上,如下所示。
    
    DRPC : Query starts
    [["1234123401 - 1234123402",10]]
    DEBUG: [1234123401 - 1234123402, 10]
    DEBUG: [1234123401 - 1234123403, 10]
    [[20]]
    DRPC : Query ends