在本文中,我们将带你了解rx.subjects.ReplaySubject的实例源码在这篇文章中,我们将为您详细介绍rx.subjects.ReplaySubject的实例源码的方方面面,并解答rxj
在本文中,我们将带你了解rx.subjects.ReplaySubject的实例源码在这篇文章中,我们将为您详细介绍rx.subjects.ReplaySubject的实例源码的方方面面,并解答rxjava源码解析常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的Angular 中的 Subject vs BehaviorSubject vs ReplaySubject、BehaviorSubject与 Subject 不同之处、c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult、com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码。
本文目录一览:- rx.subjects.ReplaySubject的实例源码(rxjava源码解析)
- Angular 中的 Subject vs BehaviorSubject vs ReplaySubject
- BehaviorSubject与 Subject 不同之处
- c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult
- com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码
rx.subjects.ReplaySubject的实例源码(rxjava源码解析)
protected HystrixObservable<Response> forceFallbackCommand(Invocation invocation) { return new HystrixObservable<Response>() { @Override public Observable<Response> observe() { ReplaySubject<Response> subject = ReplaySubject.create(); final Subscription sourceSubscription = toObservable().subscribe(subject); return subject.doOnUnsubscribe(sourceSubscription::unsubscribe); } @Override public Observable<Response> toObservable() { return Observable.create(f -> { try { f.onNext(FallbackPolicyManager.getFallbackResponse(handler.groupname,invocation)); } catch (Exception e) { f.onError(e); } }); } }; }
void replaySubject(){ ReplaySubject<Integer> replaySub = ReplaySubject.create(); replaySub.onNext(1); replaySub.onNext(2); Subscription subscription = replaySub.doOnSubscribe(new Action0() { @Override public void call() { Log.i(TAG,"Observer subscribed to ReplaySubject"); } }).doOnUnsubscribe(new Action0() { @Override public void call() { Log.i(TAG,"Observer unsubscribed to ReplaySubject "); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG,"New Event received from ReplaySubject: " + integer); } }); replaySub.onNext(3); replaySub.onNext(4); subscription.unsubscribe(); replaySub.onNext(5); replaySub.onCompleted(); }
@Test public void observe() throws Exception { ReplaySubject<Cursor> collector = ReplaySubject.create(); Subscription subscription = mSimplDb.get().observe(TableTest.class).subscribe(collector); mInsert.contentValues.put(DATA,"Data"); mSimplDb.get().insert(mInsert).toBlocking().single(); mInsert.contentValues.put(DATA,"data"); mSimplDb.get().insert(mInsert).toBlocking().single(); collector.onCompleted(); subscription.unsubscribe(); Cursor cursor = collector.toList().toBlocking().single().get(collector.size() - 1); assertEquals(2,cursor.getCount()); cursor.movetoFirst(); assertData("Data",cursor); cursor.movetoLast(); assertData("data",cursor); }
@Test public void testObserveAuth() throws Exception { Observable<Boolean> isAuthenticated = observeAuth(firebaseAuth) .map(new Func1<FirebaseUser,Boolean>() { @Override public Boolean call(FirebaseUser user) { return user != null; } }); ReplaySubject<Boolean> userState = ReplaySubject.create(); isAuthenticated.subscribe(userState); await(authAnonymously(firebaseAuth)); firebaseAuth.signOut(); List<Boolean> observedState = await(userState.take(3).toList()); assertthat(observedState,contains(false,true,false)); }
@Override public Observable<CharactersResponse> loadCharacter(String query,String privateKey,String publicKey,long timestamp) { if (characterSubscription == null || characterSubscription.isUnsubscribed()) { characterSubject = ReplaySubject.create(); // generate hash using timestamp and API keys String hash = HashGenerator.generate(timestamp,privateKey,publicKey); characterSubscription = api.getCharacters(query,publicKey,hash,timestamp) .subscribeOn(scheduler.backgroundThread()) .subscribe(characterSubject); } return characterSubject.asObservable(); }
@Override public Observable<String> getAdIdobservable() { // Observable that emits Google Advertising adIdSubscription if (adIdSubscription == null || adIdSubscription.isUnsubscribed()) { adIdSubject = ReplaySubject.create(); adIdSubscription = Observable .concat(getAdIdFromMemoryObservable(),getAdIdFromGoogleObservable()) .first(entity -> entity != null) // lets retry if something goes wrong! .retry(2) .subscribe(adIdSubject); } return adIdSubject.asObservable(); }
@Override public Observable<Boolean> getAdIdEnabledobservable() { // Observable that emits Google Tracking Enabled if (adIdEnabledSubscription == null || adIdEnabledSubscription.isUnsubscribed()) { adIdEnabledSubject = ReplaySubject.create(); adIdEnabledSubscription = Observable .concat(getAdIdEnabledFromMemoryObservable(),getAdIdEnabledFromGoogleObservable()) .first(entity -> entity != null) // lets retry if something goes wrong! .retry(2) .subscribe(adIdEnabledSubject); } return adIdEnabledSubject.asObservable(); }
@Test public void operatorUnbufferedobserveOn() throws Exception { ReplaySubject<Integer> numbers = ReplaySubject.createWithSize(1); final AtomicInteger counter = new AtomicInteger(); numbers.lift(new Rx.OperatorUnbufferedobserveOn<Integer>(Schedulers.immediate())) .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { counter.addAndGet(number); } }); numbers.onNext(3); numbers.onNext(3); numbers.onNext(3); assertEquals(9,counter.get()); }
/** * Creates an Observable stream from this queue. There should only ever be one subscriber to * this method. Calling this method twice will complete any prevIoUsly opened observable * (leaving unprocessed elements in the queue). * @return An observable containing the contents of the queue in order */ public Observable<T> toObservable() { synchronized (mlock) { if (mSubject != null) { mSubject.onCompleted(); } if (mQueue.isEmpty()) { mSubject = ReplaySubject.create(); } else { mSubject = ReplaySubject.create(mQueue.size()); for (T data : mQueue) { mSubject.onNext(data); } } } return mSubject.map(item -> { mQueue.remove(); return item; }).asObservable(); }
public static void replaySubject() { ReplaySubject<Integer> subject = ReplaySubject.create(); subject.onNext(5); Action1<Integer> action1 = integer -> Log.i("From action1",String.valueOf(integer)); Subscription subscription1 = subject.subscribe(action1); subject.onNext(10); Action1<Integer> action2 = integer -> Log.i("From action2",String.valueOf(integer)); Subscription subscription2 = subject.subscribe(action2); subject.onNext(20); subscription1.unsubscribe(); subject.onNext(40); subscription2.unsubscribe(); subject.onNext(80); }
@Override public Observable<T> call(final Observable<T> observable) { final ReplaySubject<Notification<T>> subject = ReplaySubject.create(); final Subscription subscription = observable.materialize().subscribe(subject); return view .switchMap(new Func1<Boolean,Observable<Notification<T>>>() { @Override public Observable<Notification<T>> call(final Boolean flag) { if (flag) { return subject; } else { return Observable.empty(); } } }) .doOnUnsubscribe(new Action0() { @Override public void call() { subscription.unsubscribe(); } }) .dematerialize(); }
/** * Builds and validates the structure of the expected header then * returns an {@link Observable} that can be used to submit info to the * {@link Network} * @return a new Publisher */ public Publisher build() { subject = ReplaySubject.createWithSize(3); for(int i = 0;i < HEADER_SIZE;i++) { if(lines[i] == null) { throw new IllegalStateException("Header not properly formed (must contain 3 lines) see Header.java"); } subject.onNext(lines[i]); } Publisher p = new Publisher(); p.subject = subject; if(notifier != null) { notifier.accept(p); } return p; }
@Override public final void zugSuchen(Stellung stellung,Observer<Zug> subject) { Collection<Zug> zuege = spielregeln.liefereGueltigeZuege(stellung); if (zuege.size() > 0) { ReplaySubject<BewerteterZug> suchErgebnisse = ReplaySubject.create(); aktuelleSuchErgebnisse = suchErgebnisse; ErgebnisMelden melder = new ErgebnisMelden(subject,zuege.size()); suchErgebnisse.subscribe(melder); for (Zug zug : zuege) { EinzelnenZugUntersuchen zugUntersuchen = new EinzelnenZugUntersuchen(stellung,zug,suchErgebnisse); suchErgebnisse.subscribe(zugUntersuchen); executorService.execute(zugUntersuchen); } } else { subject.onCompleted(); } }
private Observable<List<String>> getCityNames() { if (cityNames == null) { cityNames = ReplaySubject.create(1); geoNamesClient.getCities() .flatMap(cities -> Observable.from(cities.geonames)) .map(city -> { String format = String.format("%s,%s",city.name,city.countrycode); return format; }) .toList() .subscribe(cityNames); } Log.d(TAG,"Returning city names"); return cityNames.asObservable(); }
@Test public void testServeListObservables(){ // setup Observable<Integer> os1 = Observable.range(0,101); Observable<Integer> os2 = Observable.range(100,101); List<Observable<Integer>> toServe = new LinkedList<Observable<Integer>>(); toServe.add(os1); toServe.add(os2); ReplaySubject <List<Observable<Integer>>> subject = ReplaySubject.create(); subject.onNext(toServe); // serve PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000,9000); int serverPort = portSelector.acquirePort(); RemoteObservable.serveMany(serverPort,subject,Codecs.integer()); // connect Observable<Integer> oc = RemoteObservable.connect("localhost",serverPort,Codecs.integer()); // assert Observable.sumInteger(oc).toBlockingObservable().forEach(new Action1<Integer>(){ @Override public void call(Integer t1) { Assert.assertEquals(20200,t1.intValue()); // sum of number 0-200 } }); }
public ReplaySubject<String> saveBitmap(final Bitmap bitmap,final File folder,final String name) { bitmapSubject = ReplaySubject.create(); try { String path = FileUtil.savePhoto(bitmap,folder,name); onSuccess(path); } catch (IOException e) { e.printstacktrace(); onFail(e); } return bitmapSubject; }
@Override protected final void initObservable() { subject = ReplaySubject.create(); subject.onBackpressureBuffer() .observeOn(EventThread.getScheduler(observeThread)) .subscribeOn(EventThread.getScheduler(subscribeThread)) // .subscribe(new Observer<Object>() { // @Override // public void onCompleted() { // // } // // @Override // public void onError(Throwable e) { // // } // // @Override // public void onNext(Object event) { // try { // if (valid) { // handleEvent(event); // } // } catch (InvocationTargetException e) { // throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this,e); // } // } // }); .subscribe(event -> { try { if (valid) { handleEvent(event); } } catch (InvocationTargetException e) { throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this,e); } }); }
@Override public Observable<Response<LoginResponse>> login(String email,String password) { if (loginSubscription == null || loginSubscription.isUnsubscribed()) { loginSubject = ReplaySubject.create(); loginSubscription = api.login(email,password) .subscribeOn(scheduler.backgroundThread()) .subscribe(loginSubject); } return loginSubject.asObservable(); }
@Override public Observable<Response<ProfileResponse>> getProfile(String token,int userId) { if (profileSubscription == null || profileSubscription.isUnsubscribed()) { profileSubject = ReplaySubject.create(); profileSubscription = api.getProfile(token,userId) .subscribeOn(scheduler.backgroundThread()) .subscribe(profileSubject); } return profileSubject.asObservable(); }
@Override public Observable<Response<AvatarResponse>> setAvatar(String token,int userId,String avatarBase64) { if (avatarSubscription == null || avatarSubscription.isUnsubscribed()) { avatarSubject = ReplaySubject.create(); avatarSubscription = api.setAvatar(token,userId,avatarBase64) .subscribeOn(scheduler.backgroundThread()) .subscribe(avatarSubject); } return avatarSubject.asObservable(); }
@NonNull private ReplaySubject<Result<T>> getorCreateSubjectFor(Class<T> itemClass) { if (subjects.get(itemClass) != null) { return subjects.get(itemClass); } ReplaySubject<Result<T>> subject = ReplaySubject.create(); subjects.put(itemClass,subject); return subject; }
public Observable<Account> createAccount(final Activity callingActivity,final User me,final String user,final String password) { ReplaySubject<Account> subject = ReplaySubject.create(); // Create the account if necessary Account account = new Account(user,accountType); final Bundle exTradata = new Bundle(); exTradata.putString(USER_DATA,gson.toJson(me)); boolean accountCreated = accountManager.addAccountExplicitly(account,password,exTradata); if(accountCreated) { FetLife.getBus().post(new AccountCreatedEvent(account)); }else{ // If we didn't create the account,at least make sure to update it accountManager.setPassword(account,password); } subject.onNext(account); setCurrentAccount(account); if(callingActivity != null) { // Check whether we were launched explicitly to create the account Bundle extras = callingActivity.getIntent().getExtras(); if(extras != null && accountCreated) { AccountAuthenticatorResponse response = extras.getParcelable(AccountManager.KEY_ACCOUNT_AUTHENTICATOR_RESPONSE); if(response != null) { // Let them kNow we succeeded if so... Bundle result = new Bundle(); result.putString(AccountManager.KEY_ACCOUNT_NAME,user); result.putString(AccountManager.KEY_ACCOUNT_TYPE,accountType); response.onResult(result); } } } return subject.asObservable(); }
@Override public Observable<WeatherMix> loadWeather(String city) { if (weatherSubscription == null || weatherSubscription.isUnsubscribed()) { weatherSubject = ReplaySubject.create(); weatherSubscription = Observable.concat(memoryWeather(),diskWeather(),networkWeather(city)) .first(entity -> entity != null && isSameCity(city,entity) && isUpToDate(entity)) .subscribe(weatherSubject); } return weatherSubject.asObservable(); }
@Override public Observable<WeatherHistory> loadWeatherHistory(String city,long start,long end) { if (weatherHistorySubscription == null || weatherHistorySubscription.isUnsubscribed()) { weatherHistorySubject = ReplaySubject.create(); weatherHistorySubscription = networkWeatherHistory(city,start,end) .subscribe(weatherHistorySubject); } return weatherHistorySubject.asObservable(); }
@Test public void testObserve() throws Exception { ReplaySubject<String> values = ReplaySubject.create(); observe(reference) .map(new Func1<DataSnapshot,String>() { @Override public String call(DataSnapshot dataSnapshot) { return dataSnapshot.getValue(String.class); } }) .distinctUntilChanged() .subscribe(values); Observable .concat( setValue(reference,null),setValue(reference,"foo"),"bar"),null) ) .subscribe(Subscribers.<Void>empty()); List<String> observedValues = await(values.take(4).toList()); assertthat(observedValues,contains(null,"foo","bar",null)); }
public Observable<Net.HttpResponse> write (ByteString data) { Buffer buffer = Buffer.buffer (data.toByteArray ()); ReplaySubject<Net.HttpResponse> subject = ReplaySubject.createWithSize(1); request.handler (response -> { if (!okStatuses.contains (response.statusCode ())) { throw new IllegalStateException (format("Unexpected statusCode %s and message %s",response.statusCode (),response.statusMessage ())); } response.bodyHandler (body -> { subject.onNext ( new Net.HttpResponse () { @Override public BytesEvent data () { return Codecs.BYTES.from (body.getBytes ()); } @Override public int status () { return response.statusCode (); } }); subject.onCompleted (); }); }) .exceptionHandler ( ex -> subject.onError (ex)) .putHeader ("Content-Length",String.valueOf (buffer.length ())) .end (buffer); return subject; }
@Override public Observable<boolean[]> loadTables() { if (tablesSubscription == null || tablesSubscription.isUnsubscribed()) { tablesSubject = ReplaySubject.create(); tablesSubscription = api.getTables() .subscribeOn(scheduler.backgroundThread()) .observeOn(scheduler.mainThread()) .subscribe(tablesSubject); } return tablesSubject.asObservable(); }
@Override public Observable<CustomerResponse[]> loadCustomers() { if (customeRSSubscription == null || customeRSSubscription.isUnsubscribed()) { customeRSSubject = ReplaySubject.create(); customeRSSubscription = api.getCustomers() .subscribeOn(scheduler.backgroundThread()) .observeOn(scheduler.mainThread()) .subscribe(customeRSSubject); } return customeRSSubject.asObservable(); }
@Test public void should_emits_all_items_and_call_onComplete_without_throwing_exception_when_ReplaySubject_without_error() { Subject<String,String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); replaySubject.onCompleted(); mTestSubscriber.assertValues("One","Two"); mTestSubscriber.assertNoErrors(); mTestSubscriber.assertCompleted(); }
@Test public void should_still_emits_all_items_and_throw_exception_but_not_onComplete_when_ReplaySubject_with_error() { Subject<String,String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); replaySubject.onError(new RuntimeException("Error occurs")); replaySubject.onCompleted(); mTestSubscriber.assertValues("One","Two"); mTestSubscriber.assertError(RuntimeException.class); mTestSubscriber.assertNotCompleted(); }
@Test public void should_also_emit_item_from_subscriber_when_subscriber_executes_onNext_from_the_same_thread() { Subject<String,String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); mTestSubscriber.onNext("Three"); replaySubject.onCompleted(); mTestSubscriber.assertValues("One","Two","Three"); mTestSubscriber.assertNoErrors(); mTestSubscriber.assertCompleted(); }
private RefreshableRemoteData<Feed> getFeed(String tag) { ReplaySubject<RemoteStatus> statusSubject = ReplaySubject.createWithSize(1); ReplaySubject<Feed> FeedSubject = ReplaySubject.createWithSize(1); PublishSubject<Void> refresh = PublishSubject.create(); refresh.observeOn(scheduler).forEach($ -> { CacheMode mode = CacheMode.disABLE_CACHE; fetch(tag,FeedSubject,statusSubject,mode); }); fetch(tag,CacheMode.ENABLE_CACHE); return new RefreshableRemoteData<>(FeedSubject,refresh); }
private void fetch(String tag,ReplaySubject<Feed> FeedSubject,ReplaySubject<RemoteStatus> statusSubject,CacheMode mode) { statusSubject.onNext(RemoteStatus.LOADING); Action1<Feed> action = (x) -> { statusSubject.onNext(RemoteStatus.NOT_LOADING); FeedSubject.onNext(x); }; if (tag == null) { rest.defaultFeed(mode).subscribe(action); } else { rest.search(mode,tag).subscribe(action); } }
public Observable<Boolean> init(String eventUid) { return eventInteractor.get(eventUid) .switchMap(new Func1<Event,Observable<? extends Boolean>>() { @Override public Observable<? extends Boolean> call(final Event event) { final OrganisationUnit organisationUnit = new OrganisationUnit(); final Program program = new Program(); organisationUnit.setUId(event.getorgUnit()); program.setUId(event.getProgram()); return Observable.zip(loadRulesEngine(program),eventInteractor.list(organisationUnit,program),new Func2<RuleEngine,List<Event>,Boolean>() { @Override public Boolean call(RuleEngine engine,List<Event> events) { // assign rules engine ruleEngine = engine; currentEvent = event; // clear events map eventsMap.clear(); // put all existing events into map eventsMap.putAll(ModelUtils.toMap(eventInteractor.list( organisationUnit,program).toBlocking().first())); // ruleEffectSubject = BehaviorSubject.create(); ruleEffectSubject = ReplaySubject.createWithSize(1); ruleEffectSubject.subscribeOn(Schedulers.computation()); ruleEffectSubject.observeOn(AndroidSchedulers.mainThread()); return true; } }); } }); }
@SuppressWarnings("MissingPermission") @Override public void stopUpdates() { Log.d(TAG,"stopUpdates()"); ((LocationManager) context .getSystemService(Context.LOCATION_SERVICE)) .removeUpdates(locationListener); locationSubject.onCompleted(); locationSubject = ReplaySubject.createWithSize(BUFFER_SIZE); }
@Test public void testErrorThrownIssue1685() { Subject<Object,Object> subject = ReplaySubject.create(); Observable.error(new RuntimeException("oops")) .materialize() .delay(1,TimeUnit.SECONDS) .dematerialize() .subscribe(subject); subject.subscribe(); subject.materialize().toBlocking().first(); System.out.println("Done"); }
@Test public void testAlreadyUnsubscribedInterleavesWithClient() { ReplaySubject<Integer> source = ReplaySubject.create(); Subscriber<Integer> done = Subscribers.empty(); done.unsubscribe(); @SuppressWarnings("unchecked") Observer<Integer> o = mock(Observer.class); Inorder inorder = inorder(o); Observable<Integer> result = source.publish().refCount(); result.subscribe(o); source.onNext(1); result.subscribe(done); source.onNext(2); source.onComplete(); inorder.verify(o).onNext(1); inorder.verify(o).onNext(2); inorder.verify(o).onComplete(); verify(o,never()).onError(any(Throwable.class)); }
/** * @param composite The subject containing multiple child subjects. * @param subscription The subscription to cancel the main composite subject. * @param unsubscribeAction Action performed when a child unsubscribes. */ public CompositeRequestManager( ReplaySubject<ENTITY> composite,Subscription subscription,Action0 unsubscribeAction ) { this.composite = composite; this.subscription = subscription; this.unsubscribeAction = unsubscribeAction; }
/** * Create or join with prevIoUs subscription for a collection of the entity. * * Ensures that the request logic is only run once and additional calls made * before completion will be added as a subscriber causing the observer * callbacks to be invoked,but not the on-subscribe logic. * * @param onSubscribe Logic to run for the request. * @param observer Callback to invoke on request events. * @param key A unique key to identify this request type separate from others. * @return A subscription for the observer that may be unsubscribed if necessary. */ final public Subscription createCollectionSubscription( OnSubscribe<List<ENTITY>> onSubscribe,Observer<List<ENTITY>> observer,String key ) { this.logger.trace("Creating collection subscription for Key: " + key); Observable<List<ENTITY>> callback = Observable.create(onSubscribe); callback = callback.subscribeOn(this.subscribeScheduler); callback = callback.observeOn(this.observeScheduler); CompositeRequestManager<List<ENTITY>> prevIoUsRequest = this.collectionRequests.get(key); Subscription subscription; if (null == prevIoUsRequest) { this.logger.debug("No prevIoUs request to join."); ReplaySubject<List<ENTITY>> composite = ReplaySubject.create(); Action0 unsubscribeCleanup = new UnsubscribeCleanup<List<ENTITY>>(this.logger,this.collectionRequests,key); Action0 completeCleanup = new CompleteCleanup<List<ENTITY>>(this.logger,key); callback = callback.doOnCompleted(completeCleanup); Subscription mainSubscription = callback.subscribe(composite); CompositeRequestManager<List<ENTITY>> requestManager = new CompositeRequestManager<List<ENTITY>>( composite,mainSubscription,unsubscribeCleanup ); this.collectionRequests.put(key,requestManager); subscription = requestManager.subscribe(observer); } else { this.logger.debug("Joining with prevIoUs request."); subscription = prevIoUsRequest.subscribe(observer); } return subscription; }
/** * Create or join with prevIoUs subscription for an entity. * * Ensures that the request logic is only run once and additional calls made * before completion will be added as a subscriber causing the observer * callbacks to be invoked,but not the on-subscribe logic. * * @param onSubscribe Logic to run for the request. * @param observer Callback to invoke on request events. * @param key A unique key to identify this request type separate from others. * @return A subscription for the observer that may be unsubscribed if necessary. */ final public Subscription createSubscription( OnSubscribe<ENTITY> onSubscribe,Observer<ENTITY> observer,String key ) { this.logger.trace("Creating subscription for Key: " + key); Observable<ENTITY> callback = Observable.create(onSubscribe); callback = callback.subscribeOn(this.subscribeScheduler); callback = callback.observeOn(this.observeScheduler); CompositeRequestManager<ENTITY> prevIoUsRequest = this.requests.get(key); Subscription subscription; if (null == prevIoUsRequest) { this.logger.debug("No prevIoUs request to join."); ReplaySubject<ENTITY> composite = ReplaySubject.create(); Action0 unsubscribeCleanup = new UnsubscribeCleanup<ENTITY>(this.logger,this.requests,key); Action0 completeCleanup = new CompleteCleanup<ENTITY>(this.logger,key); callback = callback.doOnCompleted(completeCleanup); Subscription mainSubscription = callback.subscribe(composite); CompositeRequestManager<ENTITY> manager = new CompositeRequestManager<ENTITY>( composite,unsubscribeCleanup ); this.requests.put(key,manager); subscription = manager.subscribe(observer); } else { this.logger.debug("Joining with prevIoUs request."); subscription = prevIoUsRequest.subscribe(observer); } return subscription; }
Angular 中的 Subject vs BehaviorSubject vs ReplaySubject
我一直在寻找了解这 3 个:
- 主题
- 行为主体
- 重播主题
我想使用它们并知道何时以及为什么,使用它们有什么好处,尽管我已经阅读了文档,观看了教程并搜索了谷歌,但我对此没有任何意义。
那么他们的目的是什么?一个真实的案例将不胜感激,它甚至不必编码。
我希望有一个清晰的解释,而不仅仅是“a+b => c you are subscribed to ....”
谢谢
答案1
小编典典它实际上归结为行为和语义。带一个
Subject
- 订阅者只会获得订阅 后 发出的发布值。问问自己,这是你想要的吗?订阅者是否需要了解有关先前值的任何信息?如果没有,那么您可以使用它,否则选择其他之一。例如,组件到组件的通信。假设您有一个组件在单击按钮时发布其他组件的事件。您可以使用带有主题的服务进行通信。BehaviorSubject
- 最后一个值被缓存。订阅者将在初始订阅时获得最新值。该主题的语义是表示随时间变化的值。例如登录用户。初始用户可能是匿名用户。但是一旦用户登录,那么新值就是经过身份验证的用户状态。
用BehaviorSubject
初始值初始化。这有时对编码偏好很重要。比如说你用null
.
然后在您的订阅中,您需要进行空检查。也许没问题,也许很烦人。
ReplaySubject
- 它可以缓存指定数量的排放。任何订阅者都将在订阅时获得所有缓存的值。你什么时候需要这种行为?老实说,我没有任何需要这种行为,除了以下情况:
如果您ReplaySubject
使用 的缓冲区大小初始化 a 1
,那么它实际上的 行为 就像 aBehaviorSubject
。最后一个值总是被缓存,所以它就像一个随时间变化的值。有了这个,就不需要像用 a
初始化null
的情况那样进行检查了。在这种情况下,在第一次发布之前,不会向订阅者发送任何值。BehaviorSubject``null
所以它真的归结为你所期望的行为(至于使用哪一个)。大多数时候,您可能想要使用
aBehaviorSubject
因为您真正想要表示的是“随时间变化的价值”语义。ReplaySubject
但我个人认为用初始化的替换没有任何问题1
。
当你真正需要的是一些缓存行为时,你想要 避免 的是使用 vanilla
。Subject
例如,您正在编写路由保护或解析。您在该守卫中获取一些数据并将其设置在
serviceSubject
中。然后在路由组件中订阅服务主题以尝试获取在防护中发出的值。哎呀。价值在哪里?它已经发出了,DUH。使用“缓存”主题!
BehaviorSubject与 Subject 不同之处
需求
保持一个ShareModule不变,实现前台器具用户和计量机构注销功能
开始思路
1.增加一个实体
2.判断是器具用户登录,如果是则调用器具用户的注销,否则调用计量机构的注销(分别显示对应的菜单)
实现
创建一个对象,订阅之前的$isLogin
如果是true
表示已登录,然后调用对应注销方法,实现代码如下:
// tobar数组的第一对象的位置,改变位置时需改变profileMenuIndex
private profileMenuIndex = 0;
tobars: Tobar[] = [
{
title: ''Profile'',
class: ''fa fa-fw fa-user'',
state: false,
onclickFn: () => {
this.router.navigateByUrl(''/main/personal'');
}
},
{
title: ''Privacy'',
class: ''fa fa-fw fa-user-secret'',
state: true,
onclickFn: () => {
}
},
{
title: ''Settings'',
state: true,
class: ''fa fa-fw fa-cog'',
onclickFn: () => {
}
},
{
title: ''Logout'',
state: true,
class: ''fa fa-fw fa-sign-out'',
onclickFn: () => {
// 器具用户注销
this.unsubscribeMain = this.departmentService.$isLogin.subscribe((isInstrumentUserLogin) => {
if (isInstrumentUserLogin) {
this.departmentLogout();
}
});
this.unsubscribeMain.unsubscribe();
// 计量机构注销
this.unsubscribeAdmin = this.systemService.$isLogin.subscribe((isAdminLogin) => {
if (isAdminLogin) {
this.logout();
}
});
this.unsubscribeAdmin.unsubscribe();
}
},
];
ngOnInit(): void {
this.departmentService.$isLogin.subscribe((isLogin) => {
if (isLogin) {
this.showProfileMenu();
}
});
}
// 改变state,true显示,false不显示
showProfileMenu(): void {
this.tobars[this.profileMenuIndex].state = true;
}
然而并没有实现
问题: 计量机构的注销并不起作用
发现:我订阅的计量机构$isLogin
不执行,所以说计量机构不能实现注销
看了潘老师之前写的$isLogin
,发现俩个并不一样private _$isLogin = new
BehaviorSubject<boolean>(false);
public $isLogin = new
Subject<boolean>();
在此感谢张喜硕组长的帮忙!!
BehaviorSubject
与 Subject
区别
Subject
创建一个Rxjs Subject
, 数据的类型是number
let subject1: Subject<number> = new Subject<number>();
然后我们使用Subject
的next
方法来emit
(发射)1条数据
subject1.next(1);
接下来对subject1
创建两个订阅,在subscription
中直接打印接受到的数据
subject1.subscribe((res: number) => console.info("subjectA ", res));
subject1.subscribe((res: number) => console.info("subjectB ", res));
接下来我在发射两条数据
subject1.next(2);
subject1.next(3);
结果
subjectA 2
subjectB 2
subjectA 3
subjectB 3
有时候我明明从数据源发射一个数据,但在订阅者拿到的值却是undefined
或者null
, 这就是因为订阅者是在数据源发射之后创建的,自然无法接收到数据了。
假如我们想在订阅者创建之后,无论什么时候都能拿到数据, 这应该怎么办呢? 那就要考虑使用BehaviourSubject
了。
BehaviourSubject
创建一个BehaviorSubject
, 默认值设为0. BehaviorSubject
需要给个默认值
然后发射一条数据1,创建一个订阅者,再发射一条数据2,再创建一个订阅者,最后发射一条数据3。
代码如下:
let subject2: BehaviorSubject<number> = new BehaviorSubject<number>(0);
subject2.next(1);
subject2.subscribe((res: number) => console.info("behavior-subjectA ", res));
subject2.next(2);
subject2.subscribe((res: number) => console.info("behavior-subjectB ", res));
subject2.next(3);
结果
behavior-subjectA 1
behavior-subjectA 2
behavior-subjectB 2
behavior-subjectA 3
behavior-subjectB 3
由于BehaviorSubject
是可以存储最后一条数据或者初始默认值的, 所以无论订阅者什么时候订阅到数据源subject2
上, 都能接收到数据。
所以针对订阅者behavior-subjectA
, 他订阅的时候,数据流里最后一条数据是1, 他能立即接收到。 然后依次能接收到最新的数据2和3。
针对订阅者behavior-subjectB
, 他订阅的时候,数据流里最后一条数据是2, 他能立即接收到。 然后只能能接收到最新的数据3了。
上述来源Subject四种主题解析,四种Subject
特点如下:
回到上述问题
所以说在订阅者,订阅他的时候之前的数据他是接受不到的,所以就出现了上述问题,修改之后
虽然是实现了,但是潘老师说这样思路是不对的,如果俩个用户同时登录呢,然后我试了一下果然出现了问题,菜单也不是我想要的了(自己考虑的还是不够啊!!)
新的思路
实现代码如下(及供参考):
- share组件
app.tobar.service.ts
export class AppTobarService {
constructor(public systemService: SystemService,
public departmentService: DepartmentService,
public router: Router) {
}
public $tobars = new BehaviorSubject([
{
title: ''Profile'',
class: ''fa fa-fw fa-user'',
onclickFn: () => {
}
},
{
title: ''Privacy'',
class: ''fa fa-fw fa-user-secret'',
onclickFn: () => {
}
},
{
title: ''Settings'',
class: ''fa fa-fw fa-cog'',
onclickFn: () => {
}
},
{
title: ''Logout'',
class: ''fa fa-fw fa-sign-out'',
onclickFn: () => {
}
},
]);
}
app.tobar.component.ts
ngOnInit(): void {
this.appTobarService.$tobars.subscribe((tobars: Tobar[]) => {
this.tobars = tobars;
});
}
- main组件
tobar.service.ts
export class TobarService extends AppTobarService {
getTobar(): any[] {
return [
{
title: ''个人中心'',
class: ''fa fa-fw fa-user'',
onclickFn: () => {
this.router.navigateByUrl(''/main/personal'');
}
},
{
title: ''注销'',
class: ''fa fa-fw fa-sign-out'',
onclickFn: () => {
this.departmentService.loginOut();
}
},
];
}
}
main.component.ts
ngOnInit() {
// 初始化顶部菜
this.appTobarService.$tobars.next(this.tobarService.getTobar());
}
- admin组件
tobar.service.ts
export class TobarService extends AppTobarService {
getTobar(): any[] {
return [
{
title: ''系统设置'',
class: ''fa fa-fw fa-cog'',
onclickFn: () => {
this.router.navigateByUrl(''/admin/system'');
}
},
{
title: ''注销'',
class: ''fa fa-fw fa-sign-out'',
onclickFn: () => {
this.systemService.logout();
}
},
];
}
}
admin.component.ts
ngOnInit() {
// 初始化顶部菜单
this.appTobarService.$tobars.next(this.tobarService.getTobar());
}
总结
1.假设我之前的思路可行,但是在之后修改起来会很麻烦,增加一个菜单时会有很多不定的因素产生(加上时间问题,当时怎么写的已经不记得了,会产生不可控的错误)。
2.之后的思路,也就是潘老师说的一句话对扩展开放,对修改关闭
,及时以后增加菜单,Share(模板)中永远保持不变(对修改关闭),想增加菜单只需改Main
或Admin
下的菜单就可以(对扩展开放)。
c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult
无法隐式转换类型System.Data.Entity.Core.Objects.ObjectResult< X>到System.Data.Objects.ObjectResult< X>
我正在使用Visual Studio 2012.
解决方法
我终于意识到键盘和椅子之间存在问题.存储过程完成了选择,但我试图:
MyStoredProc_Result r = dbcontext.MyStoredPoc();
代替
MyStoredProc_Result r = dbcontext.MyStoredPoc().FirstOrDefault();
com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码
@Override protected void doStartElement( String uri,String name,String qName,Attributes attrs) { if (in("DeleteResult")) { if (name.equals("Deleted")) { currentDeletedobject = new Deletedobject(); } else if (name.equals("Error")) { currentError = new DeleteError(); } } }
@Override public DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteObjectsRequest,AmazonS3 s3Client) { LOGGER.debug("deleteObjects(): deleteObjectRequest.getBucketName() = " + deleteObjectsRequest.getBucketName() + ",deleteObjectRequest.getKeys() = " + deleteObjectsRequest.getKeys()); List<Deletedobject> deletedobjects = new ArrayList<>(); MockS3Bucket mockS3Bucket = mockS3Buckets.get(deleteObjectsRequest.getBucketName()); for (keyversion keyversion : deleteObjectsRequest.getKeys()) { String s3ObjectKey = keyversion.getKey(); String s3ObjectVersion = keyversion.getVersion(); String s3Objectkeyversion = s3ObjectKey + (s3ObjectVersion != null ? s3ObjectVersion : ""); mockS3Bucket.getobjects().remove(s3ObjectKey); if (mockS3Bucket.getVersions().remove(s3Objectkeyversion) != null) { Deletedobject deletedobject = new Deletedobject(); deletedobject.setKey(s3ObjectKey); deletedobject.setVersionId(s3ObjectVersion); deletedobjects.add(deletedobject); } } return new DeleteObjectsResult(deletedobjects); }
/** * {@inheritDoc} * * The stub does not currently support versions so this stub only supports full deletes of the objects. Passing in a * list of keyversion specs to be deleted will completely delete those objects from the stubs. * * @see com.amazonaws.services.s3.AmazonS3#deleteObjects(com.amazonaws.services.s3.model.DeleteObjectsRequest) */ @Override public DeleteObjectsResult deleteObjects(final DeleteObjectsRequest deleteObjectsRequest) { final List<DeleteObjectsResult.Deletedobject> deletedobjects = new ArrayList<Deletedobject>(); final BucketInfo bucket = getBucketInfo(deleteObjectsRequest.getBucketName()); for (final keyversion key : deleteObjectsRequest.getKeys()) { final boolean found = bucket.deleteObject(key.getKey()); if (!deleteObjectsRequest.getQuiet() && found) { final DeleteObjectsResult.Deletedobject result = new DeleteObjectsResult.Deletedobject(); result.setKey(key.getKey()); deletedobjects.add(result); } } return new DeleteObjectsResult(deletedobjects); }
public MultiObjectDeleteException(Collection<DeleteError> errors,Collection<Deletedobject> deletedobjects) { super("One or more objects Could not be deleted"); this.deletedobjects.addAll(deletedobjects); this.errors.addAll(errors); }
public DeleteObjectsResponse() { this(new ArrayList<Deletedobject>(),new ArrayList<DeleteError>()); }
public DeleteObjectsResponse(List<Deletedobject> deletedobjects,List<DeleteError> errors) { this.deletedobjects = deletedobjects; this.errors = errors; }
public List<Deletedobject> getDeletedobjects() { return deletedobjects; }
public void setDeletedobjects(List<Deletedobject> deletedobjects) { this.deletedobjects = deletedobjects; }
/** * Returns the list of successfully deleted objects from this request. If * {@link DeleteObjectsRequest#getQuiet()} is true,only error responses * will be returned from s3. */ public List<Deletedobject> getDeletedobjects() { return deletedobjects; }
今天关于rx.subjects.ReplaySubject的实例源码和rxjava源码解析的分享就到这里,希望大家有所收获,若想了解更多关于Angular 中的 Subject vs BehaviorSubject vs ReplaySubject、BehaviorSubject与 Subject 不同之处、c# – 无法将System.Data.Entity.Core.Objects.ObjectResult类型隐式转换为System.Data.Objects.ObjectResult、com.amazonaws.services.s3.model.DeleteObjectsResult.DeletedObject的实例源码等相关知识,可以在本站进行查询。
本文标签: