From 34358eb423853792c418618f6441a5b6fc3e861d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Fri, 7 Apr 2023 09:13:15 +0900 Subject: [PATCH 1/7] =?UTF-8?q?[1=EC=9E=A5]=20Rx=EC=8B=A4=EC=8A=B5=20?= =?UTF-8?q?=EC=A0=84=EA=B9=8C=EC=A7=80=20=EB=82=B4=EC=9A=A9=20=EC=A0=95?= =?UTF-8?q?=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/kotlin/ezhoon/Main.kt | 2 - src/main/kotlin/kms/Main.kt | 10 --- src/main/kotlin/kms/chapter01/README.md | 106 ++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 12 deletions(-) delete mode 100644 src/main/kotlin/kms/Main.kt create mode 100644 src/main/kotlin/kms/chapter01/README.md diff --git a/src/main/kotlin/ezhoon/Main.kt b/src/main/kotlin/ezhoon/Main.kt index facf4e7..a21433e 100644 --- a/src/main/kotlin/ezhoon/Main.kt +++ b/src/main/kotlin/ezhoon/Main.kt @@ -1,7 +1,5 @@ package ezhoon -import io.reactivex.rxjava3.core.Flowable - fun main() { Flowable.just("Hello world").subscribe { x: String? -> diff --git a/src/main/kotlin/kms/Main.kt b/src/main/kotlin/kms/Main.kt deleted file mode 100644 index 4b06c93..0000000 --- a/src/main/kotlin/kms/Main.kt +++ /dev/null @@ -1,10 +0,0 @@ -package kms - -import io.reactivex.rxjava3.core.Flowable - - -fun main() { - Flowable.just("Hello world").subscribe { x: String? -> - println(x) - } -} \ No newline at end of file diff --git a/src/main/kotlin/kms/chapter01/README.md b/src/main/kotlin/kms/chapter01/README.md new file mode 100644 index 0000000..1633282 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/README.md @@ -0,0 +1,106 @@ +### 리액티브 프로그래밍 + +- 이벤트나 데이터가 극단적으로 증가하여 대용량 데이터의 저장, 업데이트, 실시간 반영을 효율적으로 해결할 방법으로 주목되었다 +- 2013 년에 마이크로소프트에서 프로그래밍 모델을 비동기 및 이벤트 중심의 데이터 구조로 재정의한 Reactive Extensions 를 자바 진영으로 가져온것 + - NetFlix가 공개한 라이브러리 + +### Reactive Streams 규칙 + +- onSubscribe 는 해당 구독에서 한번 만 발생한다 +- 통지는 순차적으로 이루어진다 +- null을 통지하지 않는다 +- Publisher의 처리는 onComplete 나 onError 를 통지해 종료한다 + +### RxJava 기본 구조 + +- 데이터를 만들고 통지하는 **생산자** +- 통지된 데이터를 받아 처리하는 **소비자** + +### 생산자 - 소비자 + +#### Flowable - Subscriber +- Reactive Streams 지원 + - 기본적인 매커니즘이 Reactive Streams 와 같다 + - Subscription 으로 **데이터 개수 요청**과 **구독 해지** +- Flowable + - Reactive Streams 의 생산자인 Publisher를 구현 +- Subscriver + - Reactive Streams 의 클래스 + +#### Observable - Observer +- RxJava 2.x 버전 +- Reactive Streams 미지원 + - 기본적인 매커니즘은 Flowable - Subscriber 와 거의 같다 +- 데이터 개수를 제어하는 **배압 기능이 없다** + - 데이터 개수를 요청하지 않음 + - 구독 해지 메서드가 있는 인터페이스 **Disposable** 을 사용 + +### 연산자 + +- 소비자에게 데이터를 통지하기 전에 불필요한 데이터를 삭제한다 +- 소비자가 사용하기 쉽게 데이터를 변환한다 +- 연산자가 설정된 시점이 아닌 **데이터가 통지 받는 시점**에 처리가 실행된다 +- 함수형 프로그래밍의 영향을 받아 **Side Effect** 를 피하는게 좋다 + - 체인 도중이 아닌 소비자 측에서 하는게 좋다 + - 여러 스레드에서 공유하는 객체가 없어져 쓰레드 안전을 보장할 수 있다 + +### 비동기 처리 + +- 개발자가 직접 스레드를 관리할 필요 없이 처리 목적에 맞춰 **스케줄러**를 설정 + - 데이터를 통지하는 부분과 처리하는 부분에 지정할 수 있다 + +### Cold 생산자와 Hot 생산자 + +#### Cold 생산자 + +- 1개의 소비자와 구독 관계를 맺는다 +- 새로운 구독이 생기면 데이터를 처음부터 받는다 +- RxJava 의 기본 생산자는 Cold + +#### Hot 생산자 + +- 여러 소비자와 구독 관계를 맺는다 +- 구독한 시점부터 생산된 데이터를 받게 된다 + +#### Hot 생산자 생성하기 + +- Cold 에서 Hot 으로 변환하는 메서드를 호출 +- Processor 와 Subject 를 생성 + +### ConnectableFlowable/ ConnectableObservable +- **Hot 생산자** +- Cold 를 Hot으로 변환하는 연산자로 생성할 수 있다 +- subscribe 를 호출해도 처리를 시작하지 않고 **connect 를 호출해야 처리 시작** +- 처음 부터 여러 구독자에게 데이터를 통지할 수 있음 +- refCount 를 이용해서 일반 Flowable/Observable 을 반환 할 수 있음 + - 같은 timeline 에서 생성되는 데이터 통지 + - Connectable 이 아니기 때문에 connect 가 아닌 subscribe 로 데이터 처리 시작 + +### Flowable/Observable 을 Cold 에서 Hot 으로 변환 + +- publish() +- replay() / replay(int buffsize) / replay(long time, TimeUnit unit) +- share() + +#### publish + +- Cold 생산자에서 Connectable 을 생성하는 연산자 +- 처리를 시작한 뒤에 구독하면 **구독한 이후에 생성된 데이터 부터 통지** + +#### replay + +- Cold 생산자에서 Connectable 을 생성하는 연산자 +- 통지한 데이터를 캐시하여 처리를 시작한 뒤에 구독하면 캐시된 데이터를 먼저 통지 +- 인자가 없으면 **모든 데이터를 캐시** + +#### share + +- 여러 소비자가 구독할 수 있는 Flowable/Observable(Hot) 을 생성 +- **Connectable 을 생성하지 않는다** +- 실질적으로는 flowable.publish().refCount() 와 같다 + +>[!note] 소비자의 처리 속도가 느린 경우 +>- 소비자들이 같은 데이터를 같은 시점에 받지 않을 수 있습니다. +>- 이미 구독하고 있던 소비자는 버퍼에 있는데이터를 통지 받음 +>- 새로 구독한 소비자는 최신 데이터를 통지 받음 + From faade98bc9e98d125ba21ee9ba310fff2446e2cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Tue, 11 Apr 2023 22:21:10 +0900 Subject: [PATCH 2/7] =?UTF-8?q?[1=EC=9E=A5]=20=EC=98=88=EC=A0=9C=2011=20?= =?UTF-8?q?=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kms/chapter01/L11_FlowableSample.kt | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/main/kotlin/kms/chapter01/L11_FlowableSample.kt diff --git a/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt b/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt new file mode 100644 index 0000000..b967273 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt @@ -0,0 +1,53 @@ +package kms.chapter01 + +import io.reactivex.BackpressureStrategy +import io.reactivex.Flowable +import io.reactivex.FlowableEmitter +import io.reactivex.FlowableOnSubscribe +import io.reactivex.schedulers.Schedulers +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription + +fun main() { + val flowable = Flowable.create( + object : FlowableOnSubscribe { + override fun subscribe(emitter: FlowableEmitter) { + val strArr = arrayOf("Hello, World!", "안녕, RxJava!") + for (str in strArr) { + if (emitter.isCancelled) { + return + } + emitter.onNext(str) + } + emitter.onComplete() + } + }, + BackpressureStrategy.BUFFER, + ) + flowable + .observeOn(Schedulers.computation()) + .subscribe(object : Subscriber { + + private var subscription: Subscription? = null + + override fun onSubscribe(s: Subscription?) { + subscription = s + subscription?.request(1L) + } + + override fun onNext(data: String?) { + println("${Thread.currentThread().name}: $data") + subscription?.request(1L) + } + + override fun onError(error: Throwable?) { + error?.printStackTrace() + } + + override fun onComplete() { + println("${Thread.currentThread().name}: 완료") + } + }) + + Thread.sleep(500L) +} From aa27d7bf6a247b023bebd223a417fbd69b13d615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Tue, 11 Apr 2023 23:28:25 +0900 Subject: [PATCH 3/7] =?UTF-8?q?[1=EC=9E=A5]=20Flowable=20=EC=98=88?= =?UTF-8?q?=EC=A0=9C=20=EC=84=A4=EB=AA=85=20=EB=B0=8F=20=EC=B6=94=EA=B0=80?= =?UTF-8?q?=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kms/chapter01/L11_FlowableSample.kt | 25 ++++++++++++++++- src/main/kotlin/kms/chapter01/README.md | 28 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt b/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt index b967273..f041112 100644 --- a/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt +++ b/src/main/kotlin/kms/chapter01/L11_FlowableSample.kt @@ -8,42 +8,65 @@ import io.reactivex.schedulers.Schedulers import org.reactivestreams.Subscriber import org.reactivestreams.Subscription -fun main() { +private fun main() { val flowable = Flowable.create( object : FlowableOnSubscribe { + + // FlowableEmitter 가 Subscriber 에게 데이터를 통지한다 + // create 의 구현을 따라가면 + // 에러가 발생하면 catch 해서 onError 로 전달하는 부분이 존재함 + // 단, 치명적인 에러라면 다시 Throw 를 던짐 + // https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495 override fun subscribe(emitter: FlowableEmitter) { val strArr = arrayOf("Hello, World!", "안녕, RxJava!") for (str in strArr) { + // 구동 해지 상태에서 종료하지 않고 onNext 를 진행해도 데이터를 통지하지 않는다 + // 단, Rx 에서 해주는 것은 통지하지 않는 것이지 계속 진행은 하므로 직접 처리하는게 좋다 if (emitter.isCancelled) { return } + // 데이터를 통지한다 + // 만약 null 을 전달하면 NullPointException 이 발생한다 emitter.onNext(str) } + // onComplete 를 통지하면 그 이후엔 아무것도 통지하면 안된다 emitter.onComplete() } }, + // BackpressureStrategy 에 따라 다른 Emitter 를 생성 BackpressureStrategy.BUFFER, ) flowable + // 데이터를 받는 측의 쓰레드를 변경할 때 사용 .observeOn(Schedulers.computation()) + // Flowable 는 Publisher 인터페이스를 구현했기 때문에 Subscriber 와의 상호작용을 외부에서 영향을 받지 않는다 .subscribe(object : Subscriber { + // Subscriber 가 받을 데이터의 개수를 요청 및 구독 해지할 수 있는 인터페이스 + // onNext에서 직접 배압을 처리하기 위해서 subscription 을 멤버 변수로 저장 private var subscription: Subscription? = null override fun onSubscribe(s: Subscription?) { subscription = s + // 요청 데이터의 개수를 MAX 로 처리하면 onNext 에서 더 이상 요청하지 않아도 됨 + // onSubscribe 에서 request를 호출하지 않으면 데이터르 받을 수 없다 + // request는 onSubscribe 의 가장 마지막에서 호출 해야함 subscription?.request(1L) } + // Flowable 에서 데이터를 받으면 호출 되는 메서드 override fun onNext(data: String?) { println("${Thread.currentThread().name}: $data") subscription?.request(1L) } + // 에러가 발생했거나 에러를 통지할 때 실행되는 메서드 + // onError 이후에는 onNext 나 onComplete 가 실행되지 않는다 override fun onError(error: Throwable?) { error?.printStackTrace() } + // 모든 데이터의 통지를 끝내고 처리가 완료됐을 때 실행되는 메서드 override fun onComplete() { println("${Thread.currentThread().name}: 완료") } diff --git a/src/main/kotlin/kms/chapter01/README.md b/src/main/kotlin/kms/chapter01/README.md index 1633282..0b798cf 100644 --- a/src/main/kotlin/kms/chapter01/README.md +++ b/src/main/kotlin/kms/chapter01/README.md @@ -104,3 +104,31 @@ >- 이미 구독하고 있던 소비자는 버퍼에 있는데이터를 통지 받음 >- 새로 구독한 소비자는 최신 데이터를 통지 받음 +### Flowable 시퀀스 + +1. Subscriber 가 Flowable을 구독한다(**Subscribe**) +2. Flowable이 **Subscription**을 생성한다 +3. Flowable이 Subscriber에 구독 시작(**onSubscribe**)을 통지하여 Subscription을 전달 +4. Subscriber는 Subscription에 데이터를 통지하게 요청 +5. Flowable은 데이터를 Subscriber에게 통지 +6. Subscriber는 받은 데이터를 처리한다 +7. 처리한 후 Subscriber는 Subscription에게 데이터 통지를 요청 +8. Flowable은 데이터가 있다면 그 데이터를 Subscriber에게 통지 +9. Subscriber는 받은 데이터를 처리한다 +10. 처리한 후 Subscriber는 Subscription에게 데이터 통지를 요청 +11. Flowable 은 데이터가 없다면 완료(onComplete)를 통지 +12. Subscriber는 완료를 처리한다 + +### BackpressureStrategy 종류 + +- **BUFFER** + - 통지할 수 있을 때까지 모든 데이터를 버퍼링 한다 +- **DROP** + - 통지할 수 있을 때까지 새로 생성한 데이터를 삭제한다 +- **LATEST** + - 생성한 최신 데이터만 버퍼링하고 생성할 때마다 버퍼링하는 데이터를 교환한다 +- **ERROR** + - 버퍼 크기를 초과하면 MissingBackPressureException 에러를 통지 +- **NONE** + - 특정 처리를 수행하지 않는다. + - onBackPressure로 시작하는 메서드로 배압 모드를 설정할 때 사용 \ No newline at end of file From f680512efe1cc68460b1766a5d741b4dc1c0589f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Wed, 12 Apr 2023 22:49:48 +0900 Subject: [PATCH 4/7] =?UTF-8?q?[1=EC=9E=A5]=20Observable=20=EB=82=B4?= =?UTF-8?q?=EC=9A=A9=20=EC=B6=94=EA=B0=80=20=EB=B0=8F=20=EC=98=88=EC=A0=9C?= =?UTF-8?q?=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kms/chapter01/L18_ObservableSample.kt | 46 +++++++++++++++++++ src/main/kotlin/kms/chapter01/README.md | 33 ++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/kms/chapter01/L18_ObservableSample.kt diff --git a/src/main/kotlin/kms/chapter01/L18_ObservableSample.kt b/src/main/kotlin/kms/chapter01/L18_ObservableSample.kt new file mode 100644 index 0000000..762eae3 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L18_ObservableSample.kt @@ -0,0 +1,46 @@ +package kms.chapter01 + +import io.reactivex.Observable +import io.reactivex.ObservableOnSubscribe +import io.reactivex.Observer +import io.reactivex.disposables.Disposable +import io.reactivex.schedulers.Schedulers + +private fun main() { + val observable = Observable.create( + ObservableOnSubscribe { emitter -> + val strArr = arrayOf("Hello, World!", "안녕, RxJava!") + + for (str in strArr) { + // 구독이 해제됐는지를 확인합니다. + if (emitter.isDisposed) { + return@ObservableOnSubscribe + } + emitter.onNext(str) + } + emitter.onComplete() + }, + ) + + observable + .observeOn(Schedulers.computation()) + .subscribe(object : Observer { + override fun onSubscribe(d: Disposable) { + } + + override fun onNext(item: String) { + // 배압 기능이 없기 때문에 데이터를 요청하지 않는다 + println("${Thread.currentThread().name}: $item") + } + + override fun onError(e: Throwable) { + e.printStackTrace() + } + + override fun onComplete() { + println("${Thread.currentThread().name}: 완료") + } + }) + + Thread.sleep(500L) +} diff --git a/src/main/kotlin/kms/chapter01/README.md b/src/main/kotlin/kms/chapter01/README.md index 0b798cf..8a5887d 100644 --- a/src/main/kotlin/kms/chapter01/README.md +++ b/src/main/kotlin/kms/chapter01/README.md @@ -131,4 +131,35 @@ - 버퍼 크기를 초과하면 MissingBackPressureException 에러를 통지 - **NONE** - 특정 처리를 수행하지 않는다. - - onBackPressure로 시작하는 메서드로 배압 모드를 설정할 때 사용 \ No newline at end of file + - onBackPressure로 시작하는 메서드로 배압 모드를 설정할 때 사용 + +### Observable + +- Observable 과 Observer 의 관계와 Flowable 과 Subscriber 의 관계의 차이점 + - Reactive Streams 사양을 구현하지 않았다 + - 배압 기능이 없다 + - 배압이 없기 때문에 오버헤드가 적다 + - 성능이 중요한 경우 사용 + +### Flowable vs Observable + +- **Flowable 사용** + - 대량 데이터(10,000건)를 처리할 때 + - 네트워크 통신이나 데이터베이스 등의 I/O 처리를 할 때 +- **Observable 사용** + - GUI 이벤트 + - 소량 데이터(1,000건)를 처리할 때 + - 데이터 처리가 기본으로 동기 방식이며, 자바 표준의 Stream 대신 사용할 때 + +#### 사용위치 + +- **서버** + - 메모리가 부족하거나 처리 대기 중인 데이터가 쌓이게 되면 서버 전체에 영향 + - 버퍼링의 상한을 정해 **MissingBackpressureException** 을 발생 시킴 +- **클라이언트** + - **MissingBackpressureException** 이 발생하지 않는게 좋음 + +#### 데이터의 일부만 사용하는 경우 + +- BackpressureStrategy.DROP 을 설정하면 처리할 수 없는 데이터를 삭제 가능 +- Observable 의 throttle 계열의 메서드로 특정 시점의 데이터만을 사용 \ No newline at end of file From 40da2ca43d904853651e62566db7cbc1dc0051f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Sat, 15 Apr 2023 15:23:07 +0900 Subject: [PATCH 5/7] =?UTF-8?q?[1=EC=9E=A5]=20BackPressure=20=EB=B6=80?= =?UTF-8?q?=ED=84=B0=201=EC=9E=A5=20=EB=81=9D=EA=B9=8C=EC=A7=80=20?= =?UTF-8?q?=EC=98=88=EC=A0=9C=20=EB=B0=8F=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../L29_CompositeDisposableSample.kt | 29 ++++++++++ .../kotlin/kms/chapter01/L30_SingleSampel.kt | 24 ++++++++ .../kotlin/kms/chapter01/L31_MaybeSample.kt | 30 ++++++++++ .../kms/chapter01/L32_CompletableSample.kt | 26 +++++++++ src/main/kotlin/kms/chapter01/README.md | 58 ++++++++++++++++++- 5 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/kms/chapter01/L29_CompositeDisposableSample.kt create mode 100644 src/main/kotlin/kms/chapter01/L30_SingleSampel.kt create mode 100644 src/main/kotlin/kms/chapter01/L31_MaybeSample.kt create mode 100644 src/main/kotlin/kms/chapter01/L32_CompletableSample.kt diff --git a/src/main/kotlin/kms/chapter01/L29_CompositeDisposableSample.kt b/src/main/kotlin/kms/chapter01/L29_CompositeDisposableSample.kt new file mode 100644 index 0000000..098f351 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L29_CompositeDisposableSample.kt @@ -0,0 +1,29 @@ +package kms.chapter01 + +import io.reactivex.Flowable +import io.reactivex.disposables.CompositeDisposable +import io.reactivex.schedulers.Schedulers + +private fun main() { + val disposable = CompositeDisposable() + disposable.add( + Flowable.range(1, 3) + .doOnCancel { println("No.1 canceled") } + .observeOn(Schedulers.computation()) + .subscribe { + Thread.sleep(100L) + println("No.1: $it") + }, + ) + disposable.add( + Flowable.range(1, 3) + .doOnCancel { println("No.2 canceled") } + .observeOn(Schedulers.computation()) + .subscribe { + Thread.sleep(100L) + println("No.2: $it") + }, + ) + Thread.sleep(150L) + disposable.dispose() +} diff --git a/src/main/kotlin/kms/chapter01/L30_SingleSampel.kt b/src/main/kotlin/kms/chapter01/L30_SingleSampel.kt new file mode 100644 index 0000000..5074e97 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L30_SingleSampel.kt @@ -0,0 +1,24 @@ +package kms.chapter01 + +import io.reactivex.Single +import io.reactivex.SingleObserver +import io.reactivex.disposables.Disposable +import java.time.DayOfWeek +import java.time.LocalDate + +private fun main() { + Single.create { emitter -> + emitter.onSuccess(LocalDate.now().dayOfWeek) + }.subscribe(object : SingleObserver { + override fun onSubscribe(d: Disposable) { + } + + override fun onSuccess(dayOfWeek: DayOfWeek) { + println(dayOfWeek) + } + + override fun onError(e: Throwable) { + e.printStackTrace() + } + }) +} diff --git a/src/main/kotlin/kms/chapter01/L31_MaybeSample.kt b/src/main/kotlin/kms/chapter01/L31_MaybeSample.kt new file mode 100644 index 0000000..70153b4 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L31_MaybeSample.kt @@ -0,0 +1,30 @@ +package kms.chapter01 + +import io.reactivex.Maybe +import io.reactivex.MaybeObserver +import io.reactivex.disposables.Disposable +import java.time.DayOfWeek + +private fun main() { + Maybe.create { emitter -> + // 데이터를 통지할 때는 onSuccess 만 통지 +// emitter.onSuccess(LocalDate.now().dayOfWeek) + // 데이터를 통지 하지 않을 때는 onComplete 만 통지 + emitter.onComplete() + }.subscribe(object : MaybeObserver { + override fun onSubscribe(d: Disposable) { + } + + override fun onSuccess(dayOfWeek: DayOfWeek) { + println(dayOfWeek) + } + + override fun onError(e: Throwable) { + e.printStackTrace() + } + + override fun onComplete() { + println("완료") + } + }) +} diff --git a/src/main/kotlin/kms/chapter01/L32_CompletableSample.kt b/src/main/kotlin/kms/chapter01/L32_CompletableSample.kt new file mode 100644 index 0000000..5629278 --- /dev/null +++ b/src/main/kotlin/kms/chapter01/L32_CompletableSample.kt @@ -0,0 +1,26 @@ +package kms.chapter01 + +import io.reactivex.Completable +import io.reactivex.CompletableObserver +import io.reactivex.disposables.Disposable +import io.reactivex.schedulers.Schedulers + +private fun main() { + Completable.create { emitter -> + emitter.onComplete() + }.subscribeOn(Schedulers.computation()) + .subscribe(object : CompletableObserver { + override fun onSubscribe(d: Disposable) { + } + + override fun onComplete() { + println("완료") + } + + override fun onError(e: Throwable) { + e.printStackTrace() + } + }) + + Thread.sleep(100L) +} diff --git a/src/main/kotlin/kms/chapter01/README.md b/src/main/kotlin/kms/chapter01/README.md index 8a5887d..d789b89 100644 --- a/src/main/kotlin/kms/chapter01/README.md +++ b/src/main/kotlin/kms/chapter01/README.md @@ -162,4 +162,60 @@ #### 데이터의 일부만 사용하는 경우 - BackpressureStrategy.DROP 을 설정하면 처리할 수 없는 데이터를 삭제 가능 -- Observable 의 throttle 계열의 메서드로 특정 시점의 데이터만을 사용 \ No newline at end of file +- Observable 의 throttle 계열의 메서드로 특정 시점의 데이터만을 사용 + +### RxJava 기본 구성 + +- **소비자**(Subscriber/Observer)가 **생산자**(Flowable/Observable)를 구독하는 형태 + - Flowable/Subscriber/Subscription + - Reactive Stream 사양 지원 + - Observable/Observer/Disposable + +>[!note] Reactive Stream 사양 +>BackPressure 을 이용하여 비동기 요소들 사이의 상호작용을 정의하는 작은 스펙 + +- FlowableProcessor 는 Processor 를 구현하고 Flowable 과 Subscriber 가 될 수 있는 추상 클래스 +- Subject 는 BackPressure 가 없는 FlowableProcessor 로 볼 수 있다 + - Observable 이나 Observer 가 될 수 있는 추상 클래스 +- Subscriber 를 구현한 DisposableSubscriber 와 ResourceSubscriber 를 제공 +- Observer 를 구현한 DisposableObserver와 ResourceObserver도 제공 + +### 데이터 통지시 규칙 + +- **null** 을 통지하면 안 된다 +- 데이터 통지는 **해도 되고 안 해도 된다** +- Flowable/ Observable 의 처리를 끝낼 때는 **완료**나 **에러** 통지를 해야 하며, 둘 다 통지하지는 않는다 +- 완료나 에러 통지를 **한 뒤에는 다른 통지를 하면 안된다** +- 통지할 때는 **1건씩 순차적**으로 통지하며, 동시에 통지하면 안 된다 + +### Processor / Subject 종류 + +- **Publish** + - 데이터를 받은 시점에만 소비자에 데이터를 통지 + - SharedFlow 처럼 동작 +- **Behavior** + - 소비자가 구독하기 직전 데이터를 버퍼링해 해당 데이터 부터 통지 + - StateFlow 처럼 동작 +- **Replay** + - 처리하는 도중 구동한 소비자에게도 받은 모든 데이터를 통지 +- **Async** + - 데이터 생성을 완료했을 때 마지막으로 받은 데이터만 소비자에게 통지 +- **Unicast** + - 1개의 소비자만 구독할 수 있다 + +### CompositeDisposable + +- 여러 Disposable 을 모아 한번에 구동 해지하는 클래스 + +### Single / Maybe / Completable + +- **Single** + - 1건만 통지하거나 에러를 통지하는 클래스 + - 통지 자체가 완료를 의미 + - SingleObserver +- **Maybe** + - 0 or 1건만 통지하고 완료 or 에러를 통지하는 클래스 + - MaybeObserver +- **Completable** + - 데이터를 통지하지 않고 완료 or 에러를 통지하는 클래스 + - CompletableObserver \ No newline at end of file From 31362f1b90b948f848dd34073be390ec3864ef58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Sat, 29 Apr 2023 18:53:53 +0900 Subject: [PATCH 6/7] =?UTF-8?q?[2=EC=9E=A5]=20=ED=95=99=EC=8A=B5=20?= =?UTF-8?q?=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/kotlin/kms/chapter02/README.md | 91 +++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 src/main/kotlin/kms/chapter02/README.md diff --git a/src/main/kotlin/kms/chapter02/README.md b/src/main/kotlin/kms/chapter02/README.md new file mode 100644 index 0000000..57c01ac --- /dev/null +++ b/src/main/kotlin/kms/chapter02/README.md @@ -0,0 +1,91 @@ +### 람다식 + +- 함수형 인터페이스를 구현하기위해 도입한 표현식 + - 구현해야하는 메서드가 **1개** + - kotlin 에서는 SAM (Single Abstract Method) 를 이용 + - JAVA 8 부터 도입 + - JAVA 8 이전 버전에서는 Retrolambda 라는 **백포트**를 이용해 사용 가능 + +> [!note] 백포트 +> 상위 버전의 기능을 하위 버전에 반영해주는 것 + +### Rx에서 제공하는 인터페이스 + +#### Function/Predicate + +- 인자 O +- 반환 O + |인터페이스|반환값|메서드| + |---|---|---| + | Function\ | R | apply(t: T) | + | Predicate\ | boolean | test(t: T) | + +#### BooleanSupplier + +- 인자 X +- 반환 O + |인터페이스|반환값|메서드| + |---|---|---| + | BooleanSupplier | boolean | getAsBoolean() | + +#### Action/Consumer + +- 어떤 부가 작용이 발생하는 메서드 +- 반환 X + |인터페이스|반환값|메서드| + |---|---|---| + | Action | 없음 | run() | + | Consumer\ | 없음 | accept(t: T) |\\ + +#### Cancellable + +- 취소처리를 구현하는데 사용 +- 반환 X +- 인자 X + |인터페이스|반환값|메서드| + |---|---|---| + | Cancellable | 없음 | cancel() | + +### 람다식과 익명 클래스의 차이 + +- **this** 의 대상이 다름 +- 람다식 + - **람다식을 구현한 클래스**의 인스턴스 +- 익명 클래스 + - **구현한 익명 클래스**의 인스턴스 + +### 비동기 처리 + +>작업의 처리가 끝나지 않고 다음 작업을 실행 + +- 싱글 스레드에서는 작업이 순차적으로 진행 +- DB, 네트워크 작업시에는 CPU를 사용하지 않는 순간 이 있기 때문에 기다리기 보다 다른 작업을 하는게 효율적 +- 스레드 생성 작업자체가 부하가 걸리는 작업이라 작은 작업은 순차처리하는게 이득 + +### 비동기 처리시 주의할 점 + +#### 메모리와 캐시 + +- 클래스 필드 값과 실제 메모리가 가리키는 값이 동일하지 않을 수 있다 + - 필드가 다루는 값은 메모리에서 캐시된 값 + - 나중에 적절한 시점에 실제 메모리에 값을 변경 + - 캐시 값은 **쓰레드 별**로 존재 + +#### 원자성 + +- **volatile** + - 실제 메모리의 최신 값을 받을 수 있다 + - 업데이트시 원자성을 보장하지 않는다 +- **Atomic** + - 원자성을 보장하는 클래스 + - 자신의 변경 처리에서만 원자성을 보장 +- **synchronized** + - 자신의 스레드가 해당 블록을 처리하는 도중에 다른 스레드에서 접근하는 것을 막는다 + - Lock 객체를 이용하고 해당 객체를 가지고 있으면 해당 블록에서 작업을 실행할 수 있다. + +### 비동기 처리 대응 방안 + +#### final 제한자와 불변 객체 + +- 비동기처리할때 가변 필드나 객체를 다루면 스레드에 의해 다른 캐시 값을 가질 위험이 있다 + - 변수의 재할당이나 상태를 변경하지 못하게함 \ No newline at end of file From e25bcc72aa2ec5d405b7c5183dd0e10ebeafae4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=AA=85=EC=84=9D?= Date: Sat, 29 Apr 2023 19:09:14 +0900 Subject: [PATCH 7/7] =?UTF-8?q?[2=EC=9E=A5]=20Point=20=EC=98=88=EC=A0=9C?= =?UTF-8?q?=20Java=20=EB=A1=9C=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/kms/chapter02/Point.java | 26 ++++++++++++++++ src/main/java/kms/chapter02/PointSample.java | 31 ++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 src/main/java/kms/chapter02/Point.java create mode 100644 src/main/java/kms/chapter02/PointSample.java diff --git a/src/main/java/kms/chapter02/Point.java b/src/main/java/kms/chapter02/Point.java new file mode 100644 index 0000000..d769641 --- /dev/null +++ b/src/main/java/kms/chapter02/Point.java @@ -0,0 +1,26 @@ +package kms.chapter02; + +import java.util.concurrent.atomic.AtomicInteger; + +public class Point { + private final AtomicInteger x = new AtomicInteger(0); + private final AtomicInteger y = new AtomicInteger(0); + + void rightUp() { + x.incrementAndGet(); + y.incrementAndGet(); + } + + int getX() { + return x.get(); + } + + int getY() { + return y.get(); + } + + @Override + public String toString() { + return String.format("%d, %d", getX(), getY()); + } +} diff --git a/src/main/java/kms/chapter02/PointSample.java b/src/main/java/kms/chapter02/PointSample.java new file mode 100644 index 0000000..44e3334 --- /dev/null +++ b/src/main/java/kms/chapter02/PointSample.java @@ -0,0 +1,31 @@ +package kms.chapter02; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class PointSample { + + public static void main(String[] args) throws Exception { + final Point pos = new Point(); + + Runnable task = () -> { + for(int i = 0; i < 10000 ; i++) { + pos.rightUp(); + } + }; + + ExecutorService executorService = Executors.newCachedThreadPool(); + + Future future1 = executorService.submit(task, true); + Future future2 = executorService.submit(task, true); + + if(future1.get() && future2.get()) { + System.out.println(pos); + } else { + System.out.println("failed"); + } + + executorService.shutdown(); + } +}