RxPY - 创建 Observables

  • 创建

    此方法用于创建可观察对象。它将具有观察者方法,即
    • on_next()− 当 Observable 发出一个项目时,该函数被调用。
    • on_completed()- 当 Observable 完成时调用此函数。
    • on_error()− 当 Observable 发生错误时调用此函数。
    这是一个工作示例 -
    testrx.py
    
    
    from rx import create
    
    def test_observable(observer, scheduler):
    
       observer.on_next("Hello")
    
       observer.on_error("Error occured")
    
       observer.on_completed()
    
    source = create(test_observable)
    
    source.subscribe(
    
       on_next = lambda i: print("Got - {0}".format(i)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!"),
    
    )
    
    
    这里是输出可观察的创建 -
    
    
    E:\pyrx>python testrx.py
    
    Got - Hello
    
    Job Done! 
    
    
  • empty

    这个 observable 不会输出任何东西,直接发出完整的状态。

    句法

    
    
    empty()
    
    

    返回值

    它将返回一个没有元素的 observable。

    例子

    
    
    from rx import empty
    
    test = empty()
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Job Done!
    
    
  • never

    此方法创建一个永远不会达到完整状态的可观察对象。

    句法

    
    
    never()
    
    

    返回值

    它将返回一个永远不会完成的可观察对象。

    例子

    
    
    from rx import never
    
    test = never()
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    It does not show any 输出.
    
    
  • throw

    此方法将创建一个会引发错误的 observable。

    句法

    
    
    throw(exception)
    
    

    参数

    exception:具有错误详细信息的对象。

    返回值

    返回一个带有错误详细信息的可观察对象。

    例子

    
    
    from rx import throw
    
    test = throw(Exception('There is an Error!'))
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Error: There is an Error!
    
    
  • from_

    此方法会将给定的数组或对象转换为可观察对象。

    句法

    
    
    from_(iterator)
    
    

    参数

    iterator:这是一个对象或数组。

    返回值

    这将为给定的迭代器返回一个 observable。

    例子

    
    
    from rx import from_
    
    test = from_([1,2,3,4,5,6,7,8,9,10])
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is 1
    
    The value is 2
    
    The value is 3
    
    The value is 4
    
    The value is 5
    
    The value is 6
    
    The value is 7
    
    The value is 8
    
    The value is 9
    
    The value is 10
    
    Job Done!
    
    
  • interval

    此方法将给出超时后产生的一系列值。

    句法

    
    
    interval(period)
    
    

    参数

    period:开始整数序列。

    返回值

    它按顺序返回一个包含所有值的可观察对象。

    例子

    
    
    import rx
    
    from rx import operators as ops
    
    rx.interval(1).pipe(
    
       ops.map(lambda i: i * i)
    
    ).subscribe(lambda x: print("The value is {0}".format(x)))
    
    input("Press any key to exit\n")
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Press any key to exit
    
    The value is 0
    
    The value is 1
    
    The value is 4
    
    The value is 9
    
    The value is 16
    
    The value is 25
    
    The value is 36
    
    The value is 49
    
    The value is 64
    
    The value is 81
    
    The value is 100
    
    The value is 121
    
    The value is 144
    
    The value is 169
    
    The value is 196
    
    The value is 225
    
    The value is 256
    
    The value is 289
    
    The value is 324
    
    The value is 361
    
    The value is 400
    
    
  • just

    此方法会将给定值转换为可观察值。

    句法

    
    
    just(value)
    
    

    参数

    value:要转换为可观察值。

    返回值

    它将返回具有给定值的 observable。

    例子

    
    
    from rx import just
    
    test = just([15, 25,50, 55])
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is [15, 25, 50, 55]
    
    Job Done!
    
    
  • range

    此方法将根据给定的输入给出一个整数范围。

    句法

    
    
    range(start, stop=None)
    
    

    参数

    start:范围开始的第一个值。
    stop:可选,要停止的范围的最后一个值。

    返回值

    这将根据给定的输入返回一个带有整数值的 observable。

    例子

    
    
    from rx import range
    
    test = range(0,10)
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is 0
    
    The value is 1
    
    The value is 2
    
    The value is 3
    
    The value is 4
    
    The value is 5
    
    The value is 6
    
    The value is 7
    
    The value is 8
    
    The value is 9
    
    Job Done!
    
    
  • repeat_value

    此方法将创建一个 observable,它将根据给定的计数重复给定的值。

    句法

    
    
    repeat_value(value=None, repeat_count=None)
    
    

    参数

    value:可选。要重复的值。
    repeat_count:可选。重复给定值的次数。

    返回值

    它将返回一个 observable,它将根据给定的计数重复给定的值。

    例子

    
    
    from rx import repeat_value
    
    test = repeat_value(44,10)
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    The value is 44
    
    Job Done!
    
    
  • start

    这个方法接受一个函数作为输入,并返回一个 observable,该 observable 将从输入函数返回值。

    句法

    
    
    start(func)
    
    

    参数

    func:将被调用的函数。

    返回值

    它返回一个 observable,该 observable 将具有来自输入函数的返回值。

    例子

    
    
    from rx import start
    
    test = start(lambda : "Hello World")
    
    test.subscribe(
    
       lambda x: print("The value is {0}".format(x)),
    
       on_error = lambda e: print("Error : {0}".format(e)),
    
       on_completed = lambda: print("Job Done!")
    
    )
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is Hello World
    
    Job Done!
    
    
  • timer

    此方法将在超时完成后按顺序发出值。

    句法

    
    
    timer(duetime)
    
    

    参数

    Duetime:它应该发出第一个值的时间。

    返回值

    它将返回一个 observable,其中包含在到期后发出的值。

    例子

    
    
    import rx
    
    from rx import operators as ops
    
    rx.timer(5.0, 10).pipe(
    
       ops.map(lambda i: i * i)
    
    ).subscribe(lambda x: print("The value is {0}".format(x)))
    
    input("Press any key to exit\n")
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Press any key to exit
    
    The value is 0
    
    The value is 1
    
    The value is 4
    
    The value is 9
    
    The value is 16
    
    The value is 25
    
    The value is 36
    
    The value is 49
    
    The value is 64