RxPY - 使用主题

  • 简述

    一个主题是一个可观察的序列,以及一个可以多播的观察者,即与许多订阅的观察者交谈。
    我们将讨论以下主题 -
    • 创建主题
    • 订阅主题
    • 将数据传递给主题
    • 行为主体
    • 重播主题
    • 异步主题
  • 创建主题

    要使用主题,我们需要导入主题,如下所示 -
    
    
    from rx.subject import Subject
    
    
    您可以按如下方式创建主题对象 -
    
    
    subject_test = Subject()
    
    
    该对象是具有三种方法的观察者 -
    • on_next(value)
    • on_error(error)
    • on_completed()
  • 订阅主题

    您可以在主题上创建多个订阅,如下所示 -
    
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    
  • 将数据传递给主题

    您可以将数据传递给使用 on_next(value) 方法创建的主题,如下所示 -
    
    
    subject_test.on_next("A")
    
    subject_test.on_next("B")
    
    
    数据将传递给所有订阅,添加在主题上。
    这是该主题的一个工作示例。

    例子

    
    
    from rx.subject import Subject
    
    subject_test = Subject()
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    subject_test.on_next("A")
    
    subject_test.on_next("B")
    
    
    subject_test 对象是通过调用 Subject() 创建的。subject_test 对象引用了 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 -

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is A
    
    The value is A
    
    The value is B
    
    The value is B
    
    
    我们可以使用 on_completed() 方法来停止主题执行,如下所示。

    例子

    
    
    from rx.subject import Subject
    
    subject_test = Subject()
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    subject_test.subscribe(
    
       lambda x: print("The value is {0}".format(x))
    
    )
    
    subject_test.on_next("A")
    
    subject_test.on_completed()
    
    subject_test.on_next("B")
    
    
    一旦我们调用了完成,后面调用的下一个方法就不会被调用。

    输出

    
    
    E:\pyrx>python testrx.py
    
    The value is A
    
    The value is A
    
    
    现在让我们看看如何调用 on_error(error) 方法。

    例子

    
    
    from rx.subject import Subject
    
    subject_test = Subject()
    
    subject_test.subscribe(
    
       on_error = lambda e: print("Error : {0}".format(e))
    
    )
    
    subject_test.subscribe(
    
       on_error = lambda e: print("Error : {0}".format(e))
    
    )
    
    subject_test.on_error(Exception('There is an Error!'))
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Error: There is an Error!
    
    Error: There is an Error!
    
    
  • 行为主体

    BehaviorSubject 将在调用时为您提供最新值。您可以创建行为主题,如下所示 -
    
    
    from rx.subject import BehaviorSubject
    
    behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
    
    
    这是一个使用行为主题的工作示例

    例子

    
    
    from rx.subject import BehaviorSubject
    
    behavior_subject = BehaviorSubject("Testing Behaviour Subject");
    
    behavior_subject.subscribe(
    
       lambda x: print("Observer A : {0}".format(x))
    
    )
    
    behavior_subject.on_next("Hello")
    
    behavior_subject.subscribe(
    
       lambda x: print("Observer B : {0}".format(x))
    
    )
    
    behavior_subject.on_next("Last call to Behaviour Subject")
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Observer A : Testing Behaviour Subject
    
    Observer A : Hello
    
    Observer B : Hello
    
    Observer A : Last call to Behaviour Subject
    
    Observer B : Last call to Behaviour Subject
    
    
  • 重播主题

    重播主体类似于行为主体,其中,它可以缓冲值并将其重播给新订阅者。这是一个重播主题的工作示例。

    例子

    
    
    from rx.subject import ReplaySubject
    
    replay_subject = ReplaySubject(2)
    
    replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
    
    replay_subject.on_next(1)
    
    replay_subject.on_next(2)
    
    replay_subject.on_next(3)
    
    replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
    
    replay_subject.on_next(5)
    
    
    重播主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于调用的新订阅者。

    输出

    
    
    E:\pyrx>python testrx.py
    
    Testing Replay Subject A: 1
    
    Testing Replay Subject A: 2
    
    Testing Replay Subject A: 3
    
    Testing Replay Subject B: 2
    
    Testing Replay Subject B: 3
    
    Testing Replay Subject A: 5
    
    Testing Replay Subject B: 5
    
    
  • 异步主题

    在 AsyncSubject 的情况下,最后调用的值被传递给订阅者,并且只有在调用 complete() 方法后才会完成。

    例子

    
    
    from rx.subject import AsyncSubject
    
    async_subject = AsyncSubject()
    
    async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
    
    async_subject.on_next(1)
    
    async_subject.on_next(2)
    
    async_subject.on_completed()
    
    async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
    
    Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Testing Async Subject A: 2
    
    Testing Async Subject B: 2