GVKun编程网logo

rx.subjects.ReplaySubject的实例源码(rxjava源码解析)

5

在本文中,我们将带你了解rx.subjects.ReplaySubject的实例源码在这篇文章中,我们将为您详细介绍rx.subjects.ReplaySubject的实例源码的方方面面,并解答rxj

在本文中,我们将带你了解rx.subjects.ReplaySubject的实例源码在这篇文章中,我们将为您详细介绍rx.subjects.ReplaySubject的实例源码的方方面面,并解答rxjava源码解析常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的Angular 中的 Subject vs BehaviorSubject vs ReplaySubject、BehaviorSubject与 Subject 不同之处、c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult、com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码

本文目录一览:

rx.subjects.ReplaySubject的实例源码(rxjava源码解析)

rx.subjects.ReplaySubject的实例源码(rxjava源码解析)

@H_301_1@项目:incubator-servicecomb-java-chassis    @H_301_1@文件:BizkeeperHandlerDelegate.java   
protected HystrixObservable<Response> forceFallbackCommand(Invocation invocation) {
  return new HystrixObservable<Response>() {
    @Override
    public Observable<Response> observe() {
      ReplaySubject<Response> subject = ReplaySubject.create();
      final Subscription sourceSubscription = toObservable().subscribe(subject);
      return subject.doOnUnsubscribe(sourceSubscription::unsubscribe);
    }

    @Override
    public Observable<Response> toObservable() {
      return Observable.create(f -> {
        try {
          f.onNext(FallbackPolicyManager.getFallbackResponse(handler.groupname,invocation));
        } catch (Exception e) {
          f.onError(e);
        }
      });
    }
  };
}
@H_301_1@项目:Asynchronous-Android-Programming    @H_301_1@文件:SubjectActivity.java   
void replaySubject(){
    ReplaySubject<Integer> replaySub = ReplaySubject.create();
    replaySub.onNext(1);
    replaySub.onNext(2);
    Subscription subscription = replaySub.doOnSubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG,"Observer subscribed to ReplaySubject");
        }
    }).doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG,"Observer unsubscribed to ReplaySubject ");
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG,"New Event received from ReplaySubject: " + integer);
        }
    });
    replaySub.onNext(3);
    replaySub.onNext(4);
    subscription.unsubscribe();
    replaySub.onNext(5);
    replaySub.onCompleted();
}
@H_301_1@项目:simplDb    @H_301_1@文件:SimplDbRxTest.java   
@Test
public void observe() throws Exception {
    ReplaySubject<Cursor> collector = ReplaySubject.create();
    Subscription subscription = mSimplDb.get().observe(TableTest.class).subscribe(collector);
    mInsert.contentValues.put(DATA,"Data");
    mSimplDb.get().insert(mInsert).toBlocking().single();
    mInsert.contentValues.put(DATA,"data");
    mSimplDb.get().insert(mInsert).toBlocking().single();
    collector.onCompleted();
    subscription.unsubscribe();

    Cursor cursor = collector.toList().toBlocking().single().get(collector.size() - 1);
    assertEquals(2,cursor.getCount());
    cursor.movetoFirst();
    assertData("Data",cursor);
    cursor.movetoLast();
    assertData("data",cursor);
}
@H_301_1@项目:RxRoboBase    @H_301_1@文件:AuthenticationTest.java   
@Test
public void testObserveAuth() throws Exception {
    Observable<Boolean> isAuthenticated =
            observeAuth(firebaseAuth)
            .map(new Func1<FirebaseUser,Boolean>() {
                @Override
                public Boolean call(FirebaseUser user) {
                    return user != null;
                }
            });

    ReplaySubject<Boolean> userState = ReplaySubject.create();
    isAuthenticated.subscribe(userState);

    await(authAnonymously(firebaseAuth));
    firebaseAuth.signOut();

    List<Boolean> observedState = await(userState.take(3).toList());
    assertthat(observedState,contains(false,true,false));
}
@H_301_1@项目:marvel    @H_301_1@文件:SearchInteractorImpl.java   
@Override
public Observable<CharactersResponse> loadCharacter(String query,String privateKey,String publicKey,long timestamp) {
    if (characterSubscription == null || characterSubscription.isUnsubscribed()) {
        characterSubject = ReplaySubject.create();

        // generate hash using timestamp and API keys
        String hash = HashGenerator.generate(timestamp,privateKey,publicKey);

        characterSubscription = api.getCharacters(query,publicKey,hash,timestamp)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(characterSubject);
    }

    return characterSubject.asObservable();
}
@H_301_1@项目:fyber_mobile_offers    @H_301_1@文件:AppGoogleAds.java   
@Override
public Observable<String> getAdIdobservable() {
    // Observable that emits Google Advertising adIdSubscription
    if (adIdSubscription == null || adIdSubscription.isUnsubscribed()) {
        adIdSubject = ReplaySubject.create();

        adIdSubscription = Observable
                .concat(getAdIdFromMemoryObservable(),getAdIdFromGoogleObservable())
                .first(entity -> entity != null)
                // lets retry if something goes wrong!
                .retry(2)
                .subscribe(adIdSubject);
    }

    return adIdSubject.asObservable();
}
@H_301_1@项目:fyber_mobile_offers    @H_301_1@文件:AppGoogleAds.java   
@Override
public Observable<Boolean> getAdIdEnabledobservable() {
    // Observable that emits Google Tracking Enabled
    if (adIdEnabledSubscription == null || adIdEnabledSubscription.isUnsubscribed()) {
        adIdEnabledSubject = ReplaySubject.create();

        adIdEnabledSubscription = Observable
                .concat(getAdIdEnabledFromMemoryObservable(),getAdIdEnabledFromGoogleObservable())
                .first(entity -> entity != null)
                // lets retry if something goes wrong!
                .retry(2)
                .subscribe(adIdEnabledSubject);
    }

    return adIdEnabledSubject.asObservable();
}
@H_301_1@项目:android-buruberi    @H_301_1@文件:RxTests.java   
@Test
public void operatorUnbufferedobserveOn() throws Exception {
    ReplaySubject<Integer> numbers = ReplaySubject.createWithSize(1);

    final AtomicInteger counter = new AtomicInteger();
    numbers.lift(new Rx.OperatorUnbufferedobserveOn<Integer>(Schedulers.immediate()))
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer number) {
                    counter.addAndGet(number);
                }
            });

    numbers.onNext(3);
    numbers.onNext(3);
    numbers.onNext(3);

    assertEquals(9,counter.get());
}
@H_301_1@项目:Jockey    @H_301_1@文件:ObservableQueue.java   
/**
 * Creates an Observable stream from this queue. There should only ever be one subscriber to
 * this method. Calling this method twice will complete any prevIoUsly opened observable
 * (leaving unprocessed elements in the queue).
 * @return An observable containing the contents of the queue in order
 */
public Observable<T> toObservable() {
    synchronized (mlock) {
        if (mSubject != null) {
            mSubject.onCompleted();
        }

        if (mQueue.isEmpty()) {
            mSubject = ReplaySubject.create();
        } else {
            mSubject = ReplaySubject.create(mQueue.size());

            for (T data : mQueue) {
                mSubject.onNext(data);
            }
        }
    }

    return mSubject.map(item -> {
        mQueue.remove();
        return item;
    }).asObservable();
}
@H_301_1@项目:RxSamplesPractice    @H_301_1@文件:Samples.java   
public static void replaySubject() {
    ReplaySubject<Integer> subject = ReplaySubject.create();
    subject.onNext(5);

    Action1<Integer> action1 = integer -> Log.i("From action1",String.valueOf(integer));
    Subscription subscription1 = subject.subscribe(action1);
    subject.onNext(10);

    Action1<Integer> action2 = integer -> Log.i("From action2",String.valueOf(integer));
    Subscription subscription2 = subject.subscribe(action2);
    subject.onNext(20);

    subscription1.unsubscribe();
    subject.onNext(40);

    subscription2.unsubscribe();
    subject.onNext(80);
}
@H_301_1@项目:arctor    @H_301_1@文件:WaitViewReplayTransformer.java   
@Override
public Observable<T> call(final Observable<T> observable) {
    final ReplaySubject<Notification<T>>
            subject = ReplaySubject.create();
    final Subscription subscription = observable.materialize().subscribe(subject);
    return view
            .switchMap(new Func1<Boolean,Observable<Notification<T>>>() {
                @Override
                public Observable<Notification<T>> call(final Boolean flag) {
                    if (flag) {
                        return subject;
                    } else {
                        return Observable.empty();
                    }
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            })
            .dematerialize();
}
@H_301_1@项目:htm.java    @H_301_1@文件:Publisher.java   
/**
 * Builds and validates the structure of the expected header then
 * returns an {@link Observable} that can be used to submit info to the
 * {@link Network}
 * @return  a new Publisher
 */
public Publisher build() {
    subject = ReplaySubject.createWithSize(3);
    for(int i = 0;i < HEADER_SIZE;i++) {
        if(lines[i] == null) {
            throw new IllegalStateException("Header not properly formed (must contain 3 lines) see Header.java");
        }
        subject.onNext(lines[i]);
    }

    Publisher p = new Publisher();
    p.subject = subject;

    if(notifier != null) {
        notifier.accept(p);
    }

    return p;
}
@H_301_1@项目:dokchess    @H_301_1@文件:MinimaxParalleleSuche.java   
@Override
public final void zugSuchen(Stellung stellung,Observer<Zug> subject) {
    Collection<Zug> zuege = spielregeln.liefereGueltigeZuege(stellung);
    if (zuege.size() > 0) {
        ReplaySubject<BewerteterZug> suchErgebnisse = ReplaySubject.create();
        aktuelleSuchErgebnisse = suchErgebnisse;

        ErgebnisMelden melder = new ErgebnisMelden(subject,zuege.size());
        suchErgebnisse.subscribe(melder);

        for (Zug zug : zuege) {
            EinzelnenZugUntersuchen zugUntersuchen = new EinzelnenZugUntersuchen(stellung,zug,suchErgebnisse);
            suchErgebnisse.subscribe(zugUntersuchen);
            executorService.execute(zugUntersuchen);
        }
    } else {
        subject.onCompleted();
    }
}
@H_301_1@项目:rain-or-shine    @H_301_1@文件:CoordinatedWeatherManager.java   
private Observable<List<String>> getCityNames() {
    if (cityNames == null) {
        cityNames = ReplaySubject.create(1);

        geoNamesClient.getCities()
                .flatMap(cities -> Observable.from(cities.geonames))
                .map(city -> {
                    String format = String.format("%s,%s",city.name,city.countrycode);
                    return format;
                })
                .toList()
                .subscribe(cityNames);
    }
    Log.d(TAG,"Returning city names");
    return cityNames.asObservable();
}
@H_301_1@项目:RxNetty    @H_301_1@文件:RemoteObservableTest.java   
@Test
public void testServeListObservables(){
    // setup
    Observable<Integer> os1 = Observable.range(0,101);
    Observable<Integer> os2 = Observable.range(100,101);
    List<Observable<Integer>> toServe = new LinkedList<Observable<Integer>>();
    toServe.add(os1);
    toServe.add(os2);
    ReplaySubject <List<Observable<Integer>>> subject = ReplaySubject.create();
    subject.onNext(toServe);
    // serve
    PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000,9000);
    int serverPort = portSelector.acquirePort();
    RemoteObservable.serveMany(serverPort,subject,Codecs.integer());
    // connect
    Observable<Integer> oc = RemoteObservable.connect("localhost",serverPort,Codecs.integer());
    // assert
    Observable.sumInteger(oc).toBlockingObservable().forEach(new Action1<Integer>(){
        @Override
        public void call(Integer t1) {
            Assert.assertEquals(20200,t1.intValue()); // sum of number 0-200
        }
    });
}
@H_301_1@项目:LiteReader    @H_301_1@文件:RxFile.java   
public ReplaySubject<String> saveBitmap(final Bitmap bitmap,final File folder,final String name) {
    bitmapSubject = ReplaySubject.create();
    try {
        String path = FileUtil.savePhoto(bitmap,folder,name);
        onSuccess(path);
    } catch (IOException e) {
        e.printstacktrace();
        onFail(e);
    }
    return bitmapSubject;
}
@H_301_1@项目:RxBusLib    @H_301_1@文件:SubscriberReplayEvent.java   
@Override
    protected final void initObservable() {
        subject = ReplaySubject.create();
        subject.onBackpressureBuffer()
                .observeOn(EventThread.getScheduler(observeThread))
                .subscribeOn(EventThread.getScheduler(subscribeThread))
//                .subscribe(new Observer<Object>() {
//                    @Override
//                    public void onCompleted() {
//
//                    }
//
//                    @Override
//                    public void onError(Throwable e) {
//
//                    }
//
//                    @Override
//                    public void onNext(Object event) {
//                        try {
//                            if (valid) {
//                                handleEvent(event);
//                            }
//                        } catch (InvocationTargetException e) {
//                            throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this,e);
//                        }
//                    }
//                });
                .subscribe(event -> {
                    try {
                        if (valid) {
                            handleEvent(event);
                        }
                    } catch (InvocationTargetException e) {
                        throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this,e);
                    }
                });
    }
@H_301_1@项目:bcg    @H_301_1@文件:UserInteractorImpl.java   
@Override
public Observable<Response<LoginResponse>> login(String email,String password) {
    if (loginSubscription == null || loginSubscription.isUnsubscribed()) {
        loginSubject = ReplaySubject.create();

        loginSubscription = api.login(email,password)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(loginSubject);
    }

    return loginSubject.asObservable();
}
@H_301_1@项目:bcg    @H_301_1@文件:UserInteractorImpl.java   
@Override
public Observable<Response<ProfileResponse>> getProfile(String token,int userId) {
    if (profileSubscription == null || profileSubscription.isUnsubscribed()) {
        profileSubject = ReplaySubject.create();

        profileSubscription = api.getProfile(token,userId)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(profileSubject);
    }

    return profileSubject.asObservable();
}
@H_301_1@项目:bcg    @H_301_1@文件:UserInteractorImpl.java   
@Override
public Observable<Response<AvatarResponse>> setAvatar(String token,int userId,String avatarBase64) {
    if (avatarSubscription == null || avatarSubscription.isUnsubscribed()) {
        avatarSubject = ReplaySubject.create();

        avatarSubscription = api.setAvatar(token,userId,avatarBase64)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(avatarSubject);
    }

    return avatarSubject.asObservable();
}
@H_301_1@项目:Mondo    @H_301_1@文件:FakeObservableCache.java   
@NonNull
private ReplaySubject<Result<T>> getorCreateSubjectFor(Class<T> itemClass) {
    if (subjects.get(itemClass) != null) {
        return subjects.get(itemClass);
    }
    ReplaySubject<Result<T>> subject = ReplaySubject.create();
    subjects.put(itemClass,subject);
    return subject;
}
@H_301_1@项目:fetlife-oss    @H_301_1@文件:FetLifeAccountManager.java   
public Observable<Account> createAccount(final Activity callingActivity,final User me,final String user,final String password)
{
    ReplaySubject<Account> subject = ReplaySubject.create();

    // Create the account if necessary
    Account account = new Account(user,accountType);
    final Bundle exTradata = new Bundle();
    exTradata.putString(USER_DATA,gson.toJson(me));
    boolean accountCreated = accountManager.addAccountExplicitly(account,password,exTradata);
    if(accountCreated) {
        FetLife.getBus().post(new AccountCreatedEvent(account));
    }else{
        // If we didn't create the account,at least make sure to update it
        accountManager.setPassword(account,password);
    }

    subject.onNext(account);
    setCurrentAccount(account);

    if(callingActivity != null)
    {
        // Check whether we were launched explicitly to create the account
        Bundle extras = callingActivity.getIntent().getExtras();
        if(extras != null && accountCreated)
        {
            AccountAuthenticatorResponse response = extras.getParcelable(AccountManager.KEY_ACCOUNT_AUTHENTICATOR_RESPONSE);
            if(response != null)
            {
                // Let them kNow we succeeded if so...
                Bundle result = new Bundle();
                result.putString(AccountManager.KEY_ACCOUNT_NAME,user);
                result.putString(AccountManager.KEY_ACCOUNT_TYPE,accountType);
                response.onResult(result);
            }
        }
    }

    return subject.asObservable();
}
@H_301_1@项目:weather_app    @H_301_1@文件:WeatherInteractorImpl.java   
@Override
public Observable<WeatherMix> loadWeather(String city) {
    if (weatherSubscription == null || weatherSubscription.isUnsubscribed()) {
        weatherSubject = ReplaySubject.create();

        weatherSubscription = Observable.concat(memoryWeather(),diskWeather(),networkWeather(city))
                .first(entity -> entity != null && isSameCity(city,entity) && isUpToDate(entity))
                .subscribe(weatherSubject);
    }

    return weatherSubject.asObservable();

}
@H_301_1@项目:weather_app    @H_301_1@文件:WeatherInteractorImpl.java   
@Override
public Observable<WeatherHistory> loadWeatherHistory(String city,long start,long end) {
    if (weatherHistorySubscription == null || weatherHistorySubscription.isUnsubscribed()) {
        weatherHistorySubject = ReplaySubject.create();

        weatherHistorySubscription = networkWeatherHistory(city,start,end)
                .subscribe(weatherHistorySubject);
    }

    return weatherHistorySubject.asObservable();

}
@H_301_1@项目:RxRoboBase    @H_301_1@文件:WriteTests.java   
@Test
public void testObserve() throws Exception {
    ReplaySubject<String> values = ReplaySubject.create();

    observe(reference)
            .map(new Func1<DataSnapshot,String>() {
                @Override
                public String call(DataSnapshot dataSnapshot) {
                    return dataSnapshot.getValue(String.class);
                }
            })
            .distinctUntilChanged()
            .subscribe(values);

    Observable
            .concat(
                    setValue(reference,null),setValue(reference,"foo"),"bar"),null)
            )
            .subscribe(Subscribers.<Void>empty());

    List<String> observedValues = await(values.take(4).toList());

    assertthat(observedValues,contains(null,"foo","bar",null));
}
@H_301_1@项目:lumber-mill    @H_301_1@文件:VertxHttpRequestWrapper.java   
public Observable<Net.HttpResponse> write (ByteString data) {

    Buffer buffer = Buffer.buffer (data.toByteArray ());
      ReplaySubject<Net.HttpResponse> subject = ReplaySubject.createWithSize(1);

      request.handler (response ->  {

          if (!okStatuses.contains (response.statusCode ())) {
            throw new IllegalStateException (format("Unexpected statusCode %s and message %s",response.statusCode (),response.statusMessage ()));
          }

          response.bodyHandler (body -> {

            subject.onNext (
              new Net.HttpResponse () {

                @Override
                public BytesEvent data () {
                  return Codecs.BYTES.from (body.getBytes ());
                }

                @Override
                public int status () {
                  return response.statusCode ();
                }
            });
            subject.onCompleted ();
          });
        })
        .exceptionHandler ( ex -> subject.onError (ex))
        .putHeader ("Content-Length",String.valueOf (buffer.length ()))
        .end (buffer);

      return subject;
    }
@H_301_1@项目:quandoo    @H_301_1@文件:TablesInteractorImpl.java   
@Override
public Observable<boolean[]> loadTables() {
    if (tablesSubscription == null || tablesSubscription.isUnsubscribed()) {
        tablesSubject = ReplaySubject.create();

        tablesSubscription = api.getTables()
                .subscribeOn(scheduler.backgroundThread())
                .observeOn(scheduler.mainThread())
                .subscribe(tablesSubject);
    }

    return tablesSubject.asObservable();
}
@H_301_1@项目:quandoo    @H_301_1@文件:CustomersInteractorImpl.java   
@Override
public Observable<CustomerResponse[]> loadCustomers() {
    if (customeRSSubscription == null || customeRSSubscription.isUnsubscribed()) {
        customeRSSubject = ReplaySubject.create();

        customeRSSubscription = api.getCustomers()
                .subscribeOn(scheduler.backgroundThread())
                .observeOn(scheduler.mainThread())
                .subscribe(customeRSSubject);
    }

    return customeRSSubject.asObservable();
}
@H_301_1@项目:PocketBeer    @H_301_1@文件:RxReplaySubjectTester.java   
@Test
public void should_emits_all_items_and_call_onComplete_without_throwing_exception_when_ReplaySubject_without_error() {
  Subject<String,String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One","Two");
  mTestSubscriber.assertNoErrors();
  mTestSubscriber.assertCompleted();
}
@H_301_1@项目:PocketBeer    @H_301_1@文件:RxReplaySubjectTester.java   
@Test
public void should_still_emits_all_items_and_throw_exception_but_not_onComplete_when_ReplaySubject_with_error() {
  Subject<String,String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  replaySubject.onError(new RuntimeException("Error occurs"));
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One","Two");
  mTestSubscriber.assertError(RuntimeException.class);
  mTestSubscriber.assertNotCompleted();
}
@H_301_1@项目:PocketBeer    @H_301_1@文件:RxReplaySubjectTester.java   
@Test
public void should_also_emit_item_from_subscriber_when_subscriber_executes_onNext_from_the_same_thread() {
  Subject<String,String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  mTestSubscriber.onNext("Three");
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One","Two","Three");
  mTestSubscriber.assertNoErrors();
  mTestSubscriber.assertCompleted();
}
@H_301_1@项目:progscrape-android    @H_301_1@文件:Model.java   
private RefreshableRemoteData<Feed> getFeed(String tag) {
    ReplaySubject<RemoteStatus> statusSubject = ReplaySubject.createWithSize(1);
    ReplaySubject<Feed> FeedSubject = ReplaySubject.createWithSize(1);
    PublishSubject<Void> refresh = PublishSubject.create();
    refresh.observeOn(scheduler).forEach($ -> {
        CacheMode mode = CacheMode.disABLE_CACHE;
        fetch(tag,FeedSubject,statusSubject,mode);
    });
    fetch(tag,CacheMode.ENABLE_CACHE);
    return new RefreshableRemoteData<>(FeedSubject,refresh);
}
@H_301_1@项目:progscrape-android    @H_301_1@文件:Model.java   
private void fetch(String tag,ReplaySubject<Feed> FeedSubject,ReplaySubject<RemoteStatus> statusSubject,CacheMode mode) {
    statusSubject.onNext(RemoteStatus.LOADING);
    Action1<Feed> action = (x) -> {
        statusSubject.onNext(RemoteStatus.NOT_LOADING);
        FeedSubject.onNext(x);
    };
    if (tag == null) {
        rest.defaultFeed(mode).subscribe(action);
    } else {
        rest.search(mode,tag).subscribe(action);
    }
}
@H_301_1@项目:dhis2-android-eventcapture    @H_301_1@文件:RxRulesEngine.java   
public Observable<Boolean> init(String eventUid) {
    return eventInteractor.get(eventUid)
            .switchMap(new Func1<Event,Observable<? extends Boolean>>() {
                @Override
                public Observable<? extends Boolean> call(final Event event) {
                    final OrganisationUnit organisationUnit = new OrganisationUnit();
                    final Program program = new Program();

                    organisationUnit.setUId(event.getorgUnit());
                    program.setUId(event.getProgram());

                    return Observable.zip(loadRulesEngine(program),eventInteractor.list(organisationUnit,program),new Func2<RuleEngine,List<Event>,Boolean>() {
                                @Override
                                public Boolean call(RuleEngine engine,List<Event> events) {
                                    // assign rules engine
                                    ruleEngine = engine;
                                    currentEvent = event;

                                    // clear events map
                                    eventsMap.clear();

                                    // put all existing events into map
                                    eventsMap.putAll(ModelUtils.toMap(eventInteractor.list(
                                            organisationUnit,program).toBlocking().first()));

                                    // ruleEffectSubject = BehaviorSubject.create();
                                    ruleEffectSubject = ReplaySubject.createWithSize(1);
                                    ruleEffectSubject.subscribeOn(Schedulers.computation());
                                    ruleEffectSubject.observeOn(AndroidSchedulers.mainThread());

                                    return true;
                                }
                            });
                }
            });
}
@H_301_1@项目:dhis2-android-eventcapture    @H_301_1@文件:LocationProviderImpl.java   
@SuppressWarnings("MissingPermission")
@Override
public void stopUpdates() {
    Log.d(TAG,"stopUpdates()");
    ((LocationManager) context
            .getSystemService(Context.LOCATION_SERVICE))
            .removeUpdates(locationListener);
    locationSubject.onCompleted();
    locationSubject = ReplaySubject.createWithSize(BUFFER_SIZE);
}
@H_301_1@项目:RxJavaFlow    @H_301_1@文件:ObservableTests.java   
@Test
public void testErrorThrownIssue1685() {
    Subject<Object,Object> subject = ReplaySubject.create();

    Observable.error(new RuntimeException("oops"))
        .materialize()
        .delay(1,TimeUnit.SECONDS)
        .dematerialize()
        .subscribe(subject);

    subject.subscribe();
    subject.materialize().toBlocking().first();

    System.out.println("Done");
}
@H_301_1@项目:RxJavaFlow    @H_301_1@文件:OnSubscribeRefCountTest.java   
@Test
public void testAlreadyUnsubscribedInterleavesWithClient() {
    ReplaySubject<Integer> source = ReplaySubject.create();

    Subscriber<Integer> done = Subscribers.empty();
    done.unsubscribe();

    @SuppressWarnings("unchecked")
    Observer<Integer> o = mock(Observer.class);
    Inorder inorder = inorder(o);

    Observable<Integer> result = source.publish().refCount();

    result.subscribe(o);

    source.onNext(1);

    result.subscribe(done);

    source.onNext(2);
    source.onComplete();

    inorder.verify(o).onNext(1);
    inorder.verify(o).onNext(2);
    inorder.verify(o).onComplete();
    verify(o,never()).onError(any(Throwable.class));
}
@H_301_1@项目:ground-control    @H_301_1@文件:CompositeRequestManager.java   
/**
 * @param composite The subject containing multiple child subjects.
 * @param subscription The subscription to cancel the main composite subject.
 * @param unsubscribeAction Action performed when a child unsubscribes.
 */
public CompositeRequestManager(
    ReplaySubject<ENTITY> composite,Subscription subscription,Action0 unsubscribeAction
) {
    this.composite = composite;
    this.subscription = subscription;
    this.unsubscribeAction = unsubscribeAction;
}
@H_301_1@项目:ground-control    @H_301_1@文件:SubscriptionFactory.java   
/**
 * Create or join with prevIoUs subscription for a collection of the entity.
 *
 * Ensures that the request logic is only run once and additional calls made
 * before completion will be added as a subscriber causing the observer
 * callbacks to be invoked,but not the on-subscribe logic.
 *
 * @param onSubscribe Logic to run for the request.
 * @param observer Callback to invoke on request events.
 * @param key A unique key to identify this request type separate from others.
 * @return A subscription for the observer that may be unsubscribed if necessary.
 */
final public Subscription createCollectionSubscription(
    OnSubscribe<List<ENTITY>> onSubscribe,Observer<List<ENTITY>> observer,String key
) {
    this.logger.trace("Creating collection subscription for Key: " + key);
    Observable<List<ENTITY>> callback = Observable.create(onSubscribe);
    callback = callback.subscribeOn(this.subscribeScheduler);
    callback = callback.observeOn(this.observeScheduler);


    CompositeRequestManager<List<ENTITY>> prevIoUsRequest = this.collectionRequests.get(key);
    Subscription subscription;
    if (null == prevIoUsRequest) {
        this.logger.debug("No prevIoUs request to join.");
        ReplaySubject<List<ENTITY>> composite = ReplaySubject.create();
        Action0 unsubscribeCleanup = new UnsubscribeCleanup<List<ENTITY>>(this.logger,this.collectionRequests,key);
        Action0 completeCleanup = new CompleteCleanup<List<ENTITY>>(this.logger,key);

        callback = callback.doOnCompleted(completeCleanup);

        Subscription mainSubscription = callback.subscribe(composite);
        CompositeRequestManager<List<ENTITY>> requestManager = new CompositeRequestManager<List<ENTITY>>(
            composite,mainSubscription,unsubscribeCleanup
        );
        this.collectionRequests.put(key,requestManager);
        subscription = requestManager.subscribe(observer);
    } else {
        this.logger.debug("Joining with prevIoUs request.");
        subscription = prevIoUsRequest.subscribe(observer);
    }

    return subscription;
}
@H_301_1@项目:ground-control    @H_301_1@文件:SubscriptionFactory.java   
/**
 * Create or join with prevIoUs subscription for an entity.
 *
 * Ensures that the request logic is only run once and additional calls made
 * before completion will be added as a subscriber causing the observer
 * callbacks to be invoked,but not the on-subscribe logic.
 *
 * @param onSubscribe Logic to run for the request.
 * @param observer Callback to invoke on request events.
 * @param key A unique key to identify this request type separate from others.
 * @return A subscription for the observer that may be unsubscribed if necessary.
 */
final public Subscription createSubscription(
    OnSubscribe<ENTITY> onSubscribe,Observer<ENTITY> observer,String key
) {
    this.logger.trace("Creating subscription for Key: " + key);
    Observable<ENTITY> callback = Observable.create(onSubscribe);
    callback = callback.subscribeOn(this.subscribeScheduler);
    callback = callback.observeOn(this.observeScheduler);

    CompositeRequestManager<ENTITY> prevIoUsRequest = this.requests.get(key);
    Subscription subscription;
    if (null == prevIoUsRequest) {
        this.logger.debug("No prevIoUs request to join.");
        ReplaySubject<ENTITY> composite = ReplaySubject.create();
        Action0 unsubscribeCleanup = new UnsubscribeCleanup<ENTITY>(this.logger,this.requests,key);
        Action0 completeCleanup = new CompleteCleanup<ENTITY>(this.logger,key);

        callback = callback.doOnCompleted(completeCleanup);

        Subscription mainSubscription = callback.subscribe(composite);
        CompositeRequestManager<ENTITY> manager = new CompositeRequestManager<ENTITY>(
            composite,unsubscribeCleanup
        );
        this.requests.put(key,manager);
        subscription = manager.subscribe(observer);
    } else {
        this.logger.debug("Joining with prevIoUs request.");
        subscription = prevIoUsRequest.subscribe(observer);
    }

    return subscription;
}

Angular 中的 Subject vs BehaviorSubject vs ReplaySubject

Angular 中的 Subject vs BehaviorSubject vs ReplaySubject

我一直在寻找了解这 3 个:

  • 主题
  • 行为主体
  • 重播主题

我想使用它们并知道何时以及为什么,使用它们有什么好处,尽管我已经阅读了文档,观看了教程并搜索了谷歌,但我对此没有任何意义。

那么他们的目的是什么?一个真实的案例将不胜感激,它甚至不必编码。

我希望有一个清晰的解释,而不仅仅是“a+b => c you are subscribed to ....”

谢谢

答案1

小编典典

它实际上归结为行为和语义。带一个

  • Subject- 订阅者只会获得订阅 发出的发布值。问问自己,这是你想要的吗?订阅者是否需要了解有关先前值的任何信息?如果没有,那么您可以使用它,否则选择其他之一。例如,组件到组件的通信。假设您有一个组件在单击按钮时发布其他组件的事件。您可以使用带有主题的服务进行通信。

  • BehaviorSubject- 最后一个值被缓存。订阅者将在初始订阅时获得最新值。该主题的语义是表示随时间变化的值。例如登录用户。初始用户可能是匿名用户。但是一旦用户登录,那么新值就是经过身份验证的用户状态。

BehaviorSubject初始值初始化。这有时对编码偏好很重要。比如说你用null.
然后在您的订阅中,您需要进行空检查。也许没问题,也许很烦人。

  • ReplaySubject- 它可以缓存指定数量的排放。任何订阅者都将在订阅时获得所有缓存的值。你什么时候需要这种行为?老实说,我没有任何需要这种行为,除了以下情况:

如果您ReplaySubject使用 的缓冲区大小初始化 a 1,那么它实际上的 行为 就像 a
BehaviorSubject。最后一个值总是被缓存,所以它就像一个随时间变化的值。有了这个,就不需要像用 a
初始化null的情况那样进行检查了。在这种情况下,在第一次发布之前,不会向订阅者发送任何值。BehaviorSubject``null

所以它真的归结为你所期望的行为(至于使用哪一个)。大多数时候,您可能想要使用
aBehaviorSubject因为您真正想要表示的是“随时间变化的价值”语义。ReplaySubject但我个人认为用初始化的替换没有任何问题1

当你真正需要的是一些缓存行为时,你想要 避免 的是使用 vanilla
Subject例如,您正在编写路由保护或解析。您在该守卫中获取一些数据并将其设置在
serviceSubject中。然后在路由组件中订阅服务主题以尝试获取在防护中发出的值。哎呀。价值在哪里?它已经发出了,DUH。使用“缓存”主题!

BehaviorSubject与 Subject 不同之处

BehaviorSubject与 Subject 不同之处

需求

保持一个ShareModule不变,实现前台器具用户和计量机构注销功能

开始思路

1.增加一个实体
2.判断是器具用户登录,如果是则调用器具用户的注销,否则调用计量机构的注销(分别显示对应的菜单)

实现

创建一个对象,订阅之前的$isLogin 如果是true表示已登录,然后调用对应注销方法,实现代码如下:

    // tobar数组的第一对象的位置,改变位置时需改变profileMenuIndex
    private profileMenuIndex = 0;

    tobars: Tobar[] = [
        {
            title: ''Profile'',
            class: ''fa fa-fw fa-user'',
            state: false,
            onclickFn: () => {
                this.router.navigateByUrl(''/main/personal'');
            }
        },
        {
            title: ''Privacy'',
            class: ''fa fa-fw fa-user-secret'',
            state: true,
            onclickFn: () => {
            }
        },
        {
            title: ''Settings'',
            state: true,
            class: ''fa fa-fw fa-cog'',
            onclickFn: () => {
            }
        },
        {
            title: ''Logout'',
            state: true,
            class: ''fa fa-fw fa-sign-out'',
            onclickFn: () => {
                // 器具用户注销
                this.unsubscribeMain = this.departmentService.$isLogin.subscribe((isInstrumentUserLogin) => {
                    if (isInstrumentUserLogin) {
                        this.departmentLogout();
                    }
                });
                this.unsubscribeMain.unsubscribe();
                // 计量机构注销
                this.unsubscribeAdmin = this.systemService.$isLogin.subscribe((isAdminLogin) => {
                    if (isAdminLogin) {
                        this.logout();
                    }
                });
                this.unsubscribeAdmin.unsubscribe();
            }
        },
    ];

    ngOnInit(): void {
        this.departmentService.$isLogin.subscribe((isLogin) => {
            if (isLogin) {
                this.showProfileMenu();
            }
        });
    }
    
    // 改变state,true显示,false不显示
    showProfileMenu(): void {
        this.tobars[this.profileMenuIndex].state = true;
    }

然而并没有实现

问题: 计量机构的注销并不起作用
发现:我订阅的计量机构$isLogin不执行,所以说计量机构不能实现注销

clipboard.png
clipboard.png

看了潘老师之前写的$isLogin,发现俩个并不一样
private _$isLogin = new BehaviorSubject<boolean>(false);
public $isLogin = new Subject<boolean>();

在此感谢张喜硕组长的帮忙!!

BehaviorSubjectSubject区别

  • Subject

创建一个Rxjs Subject, 数据的类型是number

let subject1: Subject<number> = new Subject<number>();

然后我们使用Subjectnext方法来emit(发射)1条数据

subject1.next(1);

接下来对subject1创建两个订阅,在subscription中直接打印接受到的数据

 subject1.subscribe((res: number) => console.info("subjectA ", res)); 
 subject1.subscribe((res: number) => console.info("subjectB ", res));

接下来我在发射两条数据

 subject1.next(2); 
 subject1.next(3);

结果

subjectA 2
subjectB 2
subjectA 3
subjectB 3

有时候我明明从数据源发射一个数据,但在订阅者拿到的值却是undefined或者null, 这就是因为订阅者是在数据源发射之后创建的,自然无法接收到数据了。
假如我们想在订阅者创建之后,无论什么时候都能拿到数据, 这应该怎么办呢? 那就要考虑使用BehaviourSubject了。

  • BehaviourSubject

创建一个BehaviorSubject, 默认值设为0. BehaviorSubject需要给个默认值
然后发射一条数据1,创建一个订阅者,再发射一条数据2,再创建一个订阅者,最后发射一条数据3。
代码如下:

let subject2: BehaviorSubject<number> = new BehaviorSubject<number>(0);
subject2.next(1);
subject2.subscribe((res: number) => console.info("behavior-subjectA ", res));
subject2.next(2);
subject2.subscribe((res: number) => console.info("behavior-subjectB ", res));
subject2.next(3);

结果

behavior-subjectA 1
behavior-subjectA 2
behavior-subjectB 2
behavior-subjectA 3
behavior-subjectB 3

由于BehaviorSubject是可以存储最后一条数据或者初始默认值的, 所以无论订阅者什么时候订阅到数据源subject2上, 都能接收到数据。
所以针对订阅者behavior-subjectA, 他订阅的时候,数据流里最后一条数据是1, 他能立即接收到。 然后依次能接收到最新的数据2和3。
针对订阅者behavior-subjectB, 他订阅的时候,数据流里最后一条数据是2, 他能立即接收到。 然后只能能接收到最新的数据3了。

上述来源Subject四种主题解析,四种Subject特点如下:

clipboard.png

回到上述问题

所以说在订阅者,订阅他的时候之前的数据他是接受不到的,所以就出现了上述问题,修改之后

clipboard.png

clipboard.png

虽然是实现了,但是潘老师说这样思路是不对的,如果俩个用户同时登录呢,然后我试了一下果然出现了问题,菜单也不是我想要的了(自己考虑的还是不够啊!!)

新的思路

clipboard.png

实现代码如下(及供参考):

  • share组件

app.tobar.service.ts

export class AppTobarService {

    constructor(public systemService: SystemService,
                public departmentService: DepartmentService,
                public router: Router) {
    }

    public $tobars = new BehaviorSubject([
        {
            title: ''Profile'',
            class: ''fa fa-fw fa-user'',
            onclickFn: () => {
            }
        },
        {
            title: ''Privacy'',
            class: ''fa fa-fw fa-user-secret'',
            onclickFn: () => {
            }
        },
        {
            title: ''Settings'',
            class: ''fa fa-fw fa-cog'',
            onclickFn: () => {
            }
        },
        {
            title: ''Logout'',
            class: ''fa fa-fw fa-sign-out'',
            onclickFn: () => {
            }
        },
    ]);
}

app.tobar.component.ts

ngOnInit(): void {
        this.appTobarService.$tobars.subscribe((tobars: Tobar[]) => {
            this.tobars = tobars;
        });
    }
  • main组件

tobar.service.ts

export class TobarService extends AppTobarService {

    getTobar(): any[] {
        return [
            {
                title: ''个人中心'',
                class: ''fa fa-fw fa-user'',
                onclickFn: () => {
                    this.router.navigateByUrl(''/main/personal'');
                }
            },
            {
                title: ''注销'',
                class: ''fa fa-fw fa-sign-out'',
                onclickFn: () => {
                    this.departmentService.loginOut();
                }
            },
        ];
    }
}

main.component.ts

ngOnInit() {
        // 初始化顶部菜 
        this.appTobarService.$tobars.next(this.tobarService.getTobar());
    }
  • admin组件

tobar.service.ts

export class TobarService extends AppTobarService {

    getTobar(): any[] {
        return [
            {
                title: ''系统设置'',
                class: ''fa fa-fw fa-cog'',
                onclickFn: () => {
                    this.router.navigateByUrl(''/admin/system'');
                }
            },
            {
                title: ''注销'',
                class: ''fa fa-fw fa-sign-out'',
                onclickFn: () => {
                    this.systemService.logout();
                }
            },
        ];
    }
}

admin.component.ts

ngOnInit() {
         // 初始化顶部菜单
        this.appTobarService.$tobars.next(this.tobarService.getTobar());
    }

总结

1.假设我之前的思路可行,但是在之后修改起来会很麻烦,增加一个菜单时会有很多不定的因素产生(加上时间问题,当时怎么写的已经不记得了,会产生不可控的错误)。

2.之后的思路,也就是潘老师说的一句话对扩展开放,对修改关闭,及时以后增加菜单,Share(模板)中永远保持不变(对修改关闭),想增加菜单只需改MainAdmin下的菜单就可以(对扩展开放)。

c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult

c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult

我正在尝试更新EDMX存储过程,我收到此错误:

无法隐式转换类型System.Data.Entity.Core.Objects.ObjectResult< X>到System.Data.Objects.ObjectResult< X>

我正在使用Visual Studio 2012.

解决方法

我有错误,这些解决方案都没有工作(我已经在使用System.Data.Entity.Core.Objects,它也在context.tt等).

我终于意识到键盘和椅子之间存在问题.存储过程完成了选择,但我试图:

MyStoredProc_Result r = dbcontext.MyStoredPoc();

代替

MyStoredProc_Result r = dbcontext.MyStoredPoc().FirstOrDefault();

com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码

com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码

项目:ibm-cos-sdk-java    文件:XmlResponsesSaxParser.java   
@Override
protected void doStartElement(
        String uri,String name,String qName,Attributes attrs) {

    if (in("DeleteResult")) {
        if (name.equals("Deleted")) {
            currentDeletedobject = new Deletedobject();
        } else if (name.equals("Error")) {
            currentError = new DeleteError();
        }
    }
}
项目:herd    文件:MockS3OperationsImpl.java   
@Override
public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest,AmazonS3 s3Client)
{
    LOGGER.debug("deleteObjects(): deleteObjectRequest.getBucketName() = " + deleteObjectsRequest.getBucketName() + ",deleteObjectRequest.getKeys() = " +
        deleteObjectsRequest.getKeys());

    List<Deletedobject> deletedobjects = new ArrayList<>();

    MockS3Bucket mockS3Bucket = mockS3Buckets.get(deleteObjectsRequest.getBucketName());

    for (keyversion keyversion : deleteObjectsRequest.getKeys())
    {
        String s3ObjectKey = keyversion.getKey();
        String s3ObjectVersion = keyversion.getVersion();
        String s3Objectkeyversion = s3ObjectKey + (s3ObjectVersion != null ? s3ObjectVersion : "");

        mockS3Bucket.getobjects().remove(s3ObjectKey);

        if (mockS3Bucket.getVersions().remove(s3Objectkeyversion) != null)
        {
            Deletedobject deletedobject = new Deletedobject();
            deletedobject.setKey(s3ObjectKey);
            deletedobject.setVersionId(s3ObjectVersion);
            deletedobjects.add(deletedobject);
        }
    }

    return new DeleteObjectsResult(deletedobjects);
}
项目:aws-java-sdk-stubs    文件:AmazonS3Stub.java   
/**
 * {@inheritDoc}
 *
 * The stub does not currently support versions so this stub only supports full deletes of the objects. Passing in a
 * list of keyversion specs to be deleted will completely delete those objects from the stubs.
 *
 * @see com.amazonaws.services.s3.AmazonS3#deleteObjects(com.amazonaws.services.s3.model.DeleteObjectsRequest)
 */
@Override
public DeleteObjectsResult deleteObjects(final DeleteObjectsRequest deleteObjectsRequest) {
  final List<DeleteObjectsResult.Deletedobject> deletedobjects = new ArrayList<Deletedobject>();
  final BucketInfo bucket = getBucketInfo(deleteObjectsRequest.getBucketName());
  for (final keyversion key : deleteObjectsRequest.getKeys()) {
    final boolean found = bucket.deleteObject(key.getKey());
    if (!deleteObjectsRequest.getQuiet() && found) {
      final DeleteObjectsResult.Deletedobject result = new DeleteObjectsResult.Deletedobject();
      result.setKey(key.getKey());
      deletedobjects.add(result);
    }
  }
  return new DeleteObjectsResult(deletedobjects);
}
项目:ibm-cos-sdk-java    文件:MultiObjectDeleteException.java   
public MultiObjectDeleteException(Collection<DeleteError> errors,Collection<Deletedobject> deletedobjects) {
    super("One or more objects Could not be deleted");
    this.deletedobjects.addAll(deletedobjects);
    this.errors.addAll(errors);
}
项目:ibm-cos-sdk-java    文件:DeleteObjectsResponse.java   
public DeleteObjectsResponse() {
    this(new ArrayList<Deletedobject>(),new ArrayList<DeleteError>());
}
项目:ibm-cos-sdk-java    文件:DeleteObjectsResponse.java   
public DeleteObjectsResponse(List<Deletedobject> deletedobjects,List<DeleteError> errors) {
    this.deletedobjects = deletedobjects;
    this.errors = errors;
}
项目:ibm-cos-sdk-java    文件:DeleteObjectsResponse.java   
public List<Deletedobject> getDeletedobjects() {
    return deletedobjects;
}
项目:ibm-cos-sdk-java    文件:DeleteObjectsResponse.java   
public void setDeletedobjects(List<Deletedobject> deletedobjects) {
    this.deletedobjects = deletedobjects;
}
项目:ibm-cos-sdk-java    文件:MultiObjectDeleteException.java   
/**
 * Returns the list of successfully deleted objects from this request. If
 * {@link DeleteObjectsRequest#getQuiet()} is true,only error responses
 * will be returned from s3.
 */
public List<Deletedobject> getDeletedobjects() {
    return deletedobjects;
}

今天关于rx.subjects.ReplaySubject的实例源码rxjava源码解析的分享就到这里,希望大家有所收获,若想了解更多关于Angular 中的 Subject vs BehaviorSubject vs ReplaySubject、BehaviorSubject与 Subject 不同之处、c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult、com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码等相关知识,可以在本站进行查询。

本文标签: