# AMQP消费者(AMQP Consumer)

高级消息队列协议(AMQP)消费者步骤通过AMQP 0-9-1兼容代理从AMQP消息生成器接收流数据。您可以配置此步骤以使用现有AMQP消息队列或创建新队列。

您还可以设置AMQP使用者步骤,以连续从AMQP消息或代理中提取流数据,以收集有关受监视事件的消息,跟踪用户对数据流的消耗或监视警报。 父AMQP Consumer步骤运行子(子转换),该子转换根据消息批量大小或持续时间执行,使您可以近乎实时地处理连续的记录流。子转换必须以“ 从流中获取记录”步骤开始。此外,您可以选择子转换中的一个步骤,将记录流回父转换,父转换将下游的记录传递给同一父转换中包含的任何其他步骤。

由于AMQP Consumer步骤持续提取流数据,您可能希望在父转换或子转换中使用Abort 步骤来停止从AMQP获取特定工作流的记录。例如,您可以按定时计划运行父转换, 或者如果传感器数据超出预设范围,则中止子转换。

# 前言(Before You Begin)

在使用AMQP Consumer步骤之前,请注意以下条件:

  • 此步骤使用并要求AMQP 0-9-1消息传递协议。
  • 在配置此步骤之前,您必须具有AMQP 0-9-1兼容代理(例如 RabbitMQ)。
  • 在转换中,您可以单独使用AMQP Consumer步骤从任何AMQP生产者或代理中获取消息。该AMQP生产者没有必要步骤。 如果要同时使用这两个步骤(无论是在相同的转换中还是在单独的转换中),则在“消费者”步骤中指定的某些设置必须与“生产者”步骤中定义的某些设置相匹配。 下面的标签部分说明了哪些设置必须匹配。
  • 虽然您可以设置此步骤以使用现有AMQP消息队列,但您也可以使用此步骤创建新的AMQP消息队列。有关更多信息,请参阅创建新的AMQP消息队列。

# General

为“ 步骤名称”和“ 转换”字段输入以下信息:

选项 描述
步骤名称 指定画布上步骤的唯一名称。默认情况下,步骤名称设置为“ AMQP Consumer ”。
转型 通过执行以下任何操作指定要执行的子转换:
  • 输入路径
  • 单击“ 浏览”以选择现有子转换
  • 单击“ 新建”以创建并保存新的子转换。有关更多详细信息,请参阅创建和保存新子转换。

  • 选定的子转换必须以“ 从流中获取记录”步骤开始。
    如果选择与当前转换具有相同根路径的转换,则会自动插入变量$ {Internal.Entry.Current.Directory}来代替公共根路径。例如,如果当前转换的路径是: /home/admin/transformation.ktr并且您在目录/home/admin/path/sub.ktr中选择转换,则路径将自动转换为:$ {Internal.Entry .Current.Directory} /path/sub.ktr 如果使用存储库,则必须指定转换的名称。如果您不使用存储库,则必须指定转换的XML文件名。 先前由引用指定的转换将自动转换为由Pentaho存储库中的转换名称指定。

    # 创建并保存新的子转换

    如果您尚未进行子转换,则可以在设置AMQP使用者步骤时创建一个转换。单击“ 新建”按钮时,新的子转换会在新的画布选项卡中自动生成所需的从流中获取记录步骤。 您可以在子转换的“从流中获取记录”步骤中自定义所有字段和类型,以匹配在父AMQP使用者步骤的“字段”选项卡中指定的字段和类型。

    1. 在AMQP Consumer步骤中,单击New。将出现“ 另存为”对话框。
    2. 导航到要保存新子转换的位置,然后键入文件名。
    3. 单击“ 保存”。将显示一个通知框,通知您已在新选项卡中创建并打开子转换。如果您以后不希望看到此通知,请选中“不再显示此项”复选框。
    4. 单击“新建转换”选项卡以查看和编辑子转换。它自动包含从流中获取记录步骤。(可选)您可以继续构建此转换并保存它。
    5. 完成后,返回AMQP Consumer步骤。

    # 选项

    AMQP Consumer步骤要求您在“设置”,“安全性”,“批次”,“字段”和“结果”字段选项卡中指定选项和参数。每个选项卡如下所述。

    # 设置选项

    在“设置”选项卡中,指定队列名称,交换名称,交换类型以及路由键或标头的连接。您可以 创建新的AMQP消息队列使用现有的AMQP消息队列

    # 创建一个新的AMQP消息队列

    AMQP使用者设置选项卡中的选项和设置使您可以在第一次在转换中运行AMQP使用者步骤时创建新的AMQP消息队列。新的AMQP消息队列将默认为以下属性:

    • 耐用
    • 不自动删除
    • 非排他性

    使用AMQP使用者创建新队列时,第一次在转换中运行AMQP使用者步骤时会初始化代理绑定。初始化绑定后,您可以在运行AMQP Consumer步骤之前启动AMQP 0-9-1消息生成器。 作为建议的最佳实践,在开始通过AMQP生成器生成或发布任何消息之前,请始终首先运行AMQP Consumer步骤。

    要在代理上创建新的AMQP消息队列,请在“设置”选项卡中定义**“ 连接”,“ 队列名称”和“ Exchange名称”**选项,如下所示:

    选项 描述
    连接 指定此步骤连接到的AMQP代理的URI地址,以将消息提取到PDI中。有关详细信息,请参阅:rabbit mp uri
    队列名称 指定此步骤将从中提取消息的新AMQP消息队列的名称。
    第一次运行转换时,将自动创建新队列。
    新队列及其exchange属性将默认为以下属性:
  • 耐用
  • 不自动删除
  • 非排他性

  • 如果指定代理上已存在的队列名称,但现有队列具有与这些不同的参数设置,或者如果指定的队列具有不同的Exchange类型(如下),则转换将中止。
    交换名称 指定新的交换名称或绑定队列的现有交换名称.
    如果交换名称尚不存在,则默认为以下属性:
  • 耐用
  • 不自动删除

  • 将Exchange名称留空以使用DEFAULT 作为Exchange类型(如下所示)并将Exchange类型设置为DIRECT。该AMQP生产者一步都要求在其匹配的空白项设置选项卡为Exchange名称。

    按照使用现有AMQP消息队列中的说明设置剩余的Exchange类型,路由密钥或标 头选项。

    # 使用现有AMQP消息队列

    要使用代理上已存在的AMQP消息队列,请在“设置”选项卡中定义选项,如下所示:

    选项 描述
    连接 指定此步骤连接到的AMQP代理的URI地址,以将消息提取到PDI中。有关详细信息,请参阅:rabbit mp uri
    队列名称 指定此步骤将从中提取消息的现有AMQP消息队列名称。指定的队列必须符合以下参数:
  • 耐用
  • 不自动删
  • 非排他性

  • 如果指定尚不存在的队列名称,则此步骤将创建新队列。新队列将符合相同的参数。请参阅创建新的AMQP消息队列。
    交换名称 指定从中绑定队列的现有交换名称。
    将Exchange名称留空以使用DEFAULT 作为Exchange类型(如下所示)并将Exchange类型设置为DIRECT。该AMQP生产者一步都要求在其匹配的空白项设置选项卡为Exchange名称。
    交换类型 指定此交换使用的类型模式。
    DIRECT -根据消息路由密钥将消息路由到队列。
  • DEFAULT - 要使用DEFAULT交换类型,请不要指定Exchange名称 (上面)并将Exchange类型设置为DIRECT。

  • FANOUT - 将消息路由到绑定到扇出交换的所有队列,并忽略路由密钥。
    TOPIC -根据消息路由密钥与用于将队列绑定到交换的模式之间的匹配,将消息路由到一个或多个队列。
    HEADERS -使用一个或多个键/值对路由消息,这些键/值对更容易表示为消息头而不是路由键。
    路由密钥 使用“路由密钥”表指定定义交换和队列之间绑定的路由密钥。有关 更多信息, 请参阅指定路由密钥
    Headers 使用Headers表指定 与相应标头关联的Name和Value。有关 更多信息,请参阅指定标题。

    # 指定路由密钥

    使用DIRECT或TOPIC 作为Exchange 类型时,请 在“路由密钥”表中指定相应的路由密钥(或多个密钥)。路由键作为字符串名称输入。

    如果选择 DIRECT作为Exchange类型,并将Exchange名称留空,则无论是否在表中指定任何路由键,都会将在“ 队列名称”选项中指定的队列名称用作路由密钥。

    为Consumer步骤指定路由键配置并运行转换后,会将路由键和Consumer配置永久绑定到指定的队列。即使您随后从此路由密钥表中删除了路由密钥,绑定也将保留在AMQP代理中。有关如何验证队列绑定的更多信息,请参阅:https://www.rabbitmq/rabbitmqctl.8.html#list_bindings

    # 指定Headers

    使用Headers作为Exchange 类型时,请 在Headers表中指定与相应标头关联的Name和Value。仅接受字符串值。

    指定标题有两个选项:

    • 匹配所有标头 -要将消息传递到Consumer步骤的队列,生产者消息必须包含Consumer步骤中的所有标头键/值对。

      请注意,生产者可能拥有比“消费者”步骤中指定的标题更多的标题。生产者标头必须与所有指定的Consumer标头匹配; 但是, 并非所有指定的Consumer标头都必须匹配所有生成器标头。

    • 匹配任何标头 -对于要传递到Consumer步骤队列的消息,至少一个标头键/值对必须在Consumer步骤和生产者上匹配。

    为Consumer步骤指定头配置并运行转换后,会永久地将头和Consumer配置绑定到指定的队列。即使您随后从此表中删除标头,绑定也将保留在AMQP代理中。 有关如何验证队列绑定的更多信息,请参阅:https://www.rabbitmq/rabbitmqctl.8.html#list_bindings

    # 安全选项卡

    “安全”选项卡允许您为AMPQ代理定义身份验证凭据。此选项卡包括以下选项:

    选项 描述
    用户名 指定访问AMQP代理所需的用户名。
    密码 指定与用户名关联的密码。
    使用安全协议 如果要为连接定义SSL属性,请选择此选项。
    此安全协议设置仅在PDI中使用。它不用于AEL Spark。
    SSL属性 上下文算法 - 指定您正在使用的安全协议的名称。
    密钥存储区密码 - 指定希望此连接使用的密钥存储区的密码。
    密钥库路径 - 指定要使用此安全连接的密钥库的文件路径位置。
    密钥库类型 - 指定密钥库类型的标识名称或字符串。
    信任存储密码 - 指定您希望此安全连接使用的信任存储对象的密码。
    信任存储路径 - 指定希望此安全连接使用的信任存储库证书的文件路径位置。
    信任存储类型 - 指定信任存储的格式。

    # 批量标签

    批量标签

    使用此选项卡可以创建用于摄取的窗口。您可以指定消息计数和/或特定时间量。

    AMQP Consumer步骤仅支持Auto-Ack消息。当步骤接收到auto-ack消息时,它们将从队列中删除。如果在所有消息被摄取之前转换停止,您将丢失剩余的消息。

    虽然任一选项都会触发消耗,但第一个满意的选项将开始批量转换。 如果使用Spark作为处理引擎,则必须仅根据“ 持续时间(ms) ” 执行子转换。

    选项 描述
    持续时间(毫秒) 指定以毫秒为单位的时间。此值是步骤在执行转换之前收集记录所花费的时间。
    如果使用Spark作为处理引擎,则必须设置此字段。
    如果设置为值“0”,则记录数将触发消耗。
    记录数量 指定一个数字。在每个'X'个记录之后,将执行指定的转换,并将这些'X'记录传递给转换。
    如果设置为值'0',则持续时间触发消耗。

    无论是记录数时间必须包含的值大于“0”运行转变更大。

    # 字段选项卡

    使用此选项卡可以定义记录格式的字段。

    选项 描述
    输入名称 输入名称由AMQP Consumer步骤指定。默认情况下分配以下输入名称:
  • message:记录中包含的单个消息。
  • 队列名称:发布和接收记录的队列。
  • routing key:与依赖模式类型关联的路由密钥。
  • 交换名称:接收消息的交换名称。
  • 输出名称 指定替换输出名称以使用与输入名称不同的字段名称。输出名称的数据类型必须匹配类型的的输入名称。
    类型 此字段标识输入名称的数据类型,不能更改。

    # 结果字段选项卡

    使用此选项卡从子转换中选择将记录流回父转换的步骤。此功能允许父转换中的AMQP Consumer步骤处理的记录向下游传递到同一父转换中包含的任何其他步骤。

    选项 描述
    返回字段: 选择将字段流回父转换的步骤名称(来自子转换)。这些返回字段中的数据值将可用于父转换中的任何后续下游步骤。

    # 元数据注入支持

    此步骤的所有字段都支持元数据注入。您可以将此步骤与ETL元数据注入一起使用,以在运行时将元数据传递给转换。

    # 推荐阅读

    AMQP生产者