RxPY - 使用 Observables

  • 简述

    可观察对象是一个创建观察者并将其附加到期望值的源的函数,例如单击、来自 dom 元素的鼠标事件等。
    本章将详细研究下面提到的主题。
    • 创建 Observables
    • 订阅并执行 Observable
  • 创建 Observables 对象

    为了创建一个 observable,我们将使用create()方法并将具有以下项目的函数传递给它。
    • on_next()- 当 Observable 触发一个项目时,这个函数被调用。
    • on_completed()- 当 Observable 完成时调用此函数。
    • on_error()− 当 Observable 发生错误时调用此函数。
    要使用 create() 方法,首先导入如下所示的方法 -
    
    
    from rx import create
    
    
    这是一个工作示例,用于创建一个 observable -
    testrx.py
    
    
    from rx import create
    
    deftest_observable(observer, scheduler):
    
       observer.on_next("Hello")
    
       observer.on_error("Error")
    
       observer.on_completed()
    
    source = create(test_observable).
    
    
  • 订阅并执行 Observable

    要订阅 observable,我们需要使用 subscribe() 函数并传递回调函数 on_next、on_error 和 on_completed。
    这是一个工作示例 -
    testrx.py
    
    
    from rx import create
    
    deftest_observable(observer, scheduler):
    
       observer.on_next("Hello")
    
       observer.on_completed()
    
    source = create(test_observable)
    
    source.subscribe(
    
       on_next = lambda i: print("Got - {0}".format(i)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!"),
    
    )
    
    
    subscribe() 方法负责执行 observable。回调函数on_next,on_erroron_completed必须传递给 subscribe 方法。调用 subscribe 方法又会执行 test_observable() 函数。
    将所有三个回调函数都传递给 subscribe() 方法不是强制性的。您可以根据您的要求传递 on_next()、on_error() 和 on_completed()。
    lambda 函数用于 on_next、on_error 和 on_completed。它将接受参数并执行给定的表达式。
    这是创建的 observable 的输出 -
    
    
    E:\pyrx>python testrx.py
    
    Got - Hello
    
    Job Done!