Đa luồng (Multi thread) và hàm zip của ReactiveX trong Android

Bài này tôi sẽ giới thiệu cách sử dụng hàm zip cho lập trình đa luồng trong thư viện RxJava (ReactiveX).

Bài toán đặt ra là có 2 luồng (Thread) chạy song song độc lập và chỉ khi 2 luồng đó kết thúc chúng ta mới làm tiếp việc thứ 3 (ví dụ: chúng ta cập nhật dữ liệu lên màn hình (UI) chỉ sau khi cả 2 luồng 1 và 2 kết thúc).

Cách thực hiện:

Bước 1: tạo ra luồng thứ 1, các bạn xem ví dụ đơn giản dưới đây, luồng này sẽ kết thúc nếu số ngẫu nhiên được sinh ra chia hết cho 3, kết quả trả về là một số nguyên:

//Create an observable 1 in the new Thread, result type is an integer
Observable observable1 = Observable.fromCallable(new Callable() {
    @Override
    public Integer call() throws Exception {
        //an variable to control/exit thread 1
        boolean mThread1Finished = false;

        while (!mThread1Finished) {
            mThread1Finished = new Random().nextInt(999) % 3 == 0;
            Log.d("Thread 1", "" + mThread1Finished);
        }
        return 1;
    }

}).subscribeOn(Schedulers.newThread());

Bước 2: tạo ra luồng thứ 2, các bạn xem ví dụ dưới đây, luồng này sẽ kết thúc nếu số ngẫu nhiên được sinh ra chia hết cho 5, kết quả trả về là 1 kiểu String:

//Create an observable 2 in the new Thread, result type is a String
Observable observable2 = Observable.fromCallable(new Callable() {
    @Override
    public String call() throws Exception {
        //an variable to control/exit thread 2
        boolean mThread2Finished = false;

        while (!mThread2Finished) {
            mThread2Finished = new Random().nextInt(999) % 5 == 0;
            Log.d("Thread 2", "" + mThread2Finished);
        }
        return "Thread 2";
    }

}).subscribeOn(Schedulers.newThread());

Bước 3: Nén 2 luồng trên lại bằng hàm zip, và kết quả trả về về là kiểu Object (chú ý: val1 là kiểu của kết quả luồng 1, val 2 là kiểu của kết quả luồng 2, sau khi 2 luồng trên đã chạy song, kết quả chúng ta muốn trả về có kiểu là Object)

//Create an zipped observable, result type is an Object 
//(note: any type you want, just process it in return statement line
Observable<Object> zippedObservable = Observable.zip(observable1, observable2,
 new Func2<Integer, String, Object>() {
    @Override
    public Object call(Integer val1, String val2) {
        Object result = "Result:" + val1 + ", " + val2;
        return result;
    }
});

Bước 4: Chạy 2 luồng trên thông qua observable đã được nén zippedObservable. Các bạn xem đoạn mã dưới đây:

//Observer the result in the mainThread Scheduler
//in the android we should use AndroidSchedulers.mainThread()
zippedObservable.observeOn(AndroidSchedulers.mainThread());

//subscribe the two observable above
zippedObservable.subscribe(new Observer<Object>() {
    @Override
    public void onCompleted() {
        Log.d("zippedObservable", "onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.d("zippedObservable", "onError", e);
    }

    @Override
    public void onNext(Object result) {
        Log.d("zippedObservable", "result:" + result.toString());
        updateUI(result);
    }
});

Các bạn chú ý hàm subscribeOn(Schedulers.newThread()) là để tạo ra 1 observerable chạy trên 1 luồng mới riêng biệt.
Trong Android, khi muốn kết quả trả về được điều khiển và có thể cập nhật lên UI, chúng ta dùng hàm observeOn(AndroidSchedulers.mainThread()) để thiết lập mainThread cho observerable đó.

Chúc các bạn thành công.

Tác giả: Lê Trung Thu

Advertisements

Trả lời

Mời bạn điền thông tin vào ô dưới đây hoặc kích vào một biểu tượng để đăng nhập:

WordPress.com Logo

Bạn đang bình luận bằng tài khoản WordPress.com Đăng xuất / Thay đổi )

Twitter picture

Bạn đang bình luận bằng tài khoản Twitter Đăng xuất / Thay đổi )

Facebook photo

Bạn đang bình luận bằng tài khoản Facebook Đăng xuất / Thay đổi )

Google+ photo

Bạn đang bình luận bằng tài khoản Google+ Đăng xuất / Thay đổi )

Connecting to %s