2016-06-07 3 views
0

Я написал некоторый код, чтобы загрузить файл с сервера, тем временем обновляя индикатор выполнения. Скачивание кода выполнялось в Schedulers.io thread и обновлялся код ui в AndroidSchedulers.mainThread. Моя программа завершилась после начала загрузки. Вот мой код:RxJava: InterruptedIOException

Observable 
    .create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       Response response = getResponse(url); 
       if (response != null && response.isSuccessful()) { 
        InputStream is = response.body().byteStream(); 
        subscriber.onNext(response.body().contentLength()); // init progress 
        File storedFile = Utils.getStoredFile(context, filePath); 
        OutputStream os = new FileOutputStream(storedFile); 

        byte[] buffer = new byte[1024]; 
        int len; 
        while ((len = is.read(buffer)) != -1) { 
         // write data 
         os.write(buffer, 0, len); 

         count += len; 
         subscriber.onNext(count); // update progress 
        } 

        if (!subscriber.isUnsubscribed()) { 
         subscriber.onCompleted(); 
        } 

        os.close(); 
        is.close(); 
        response.body().close(); 

      } catch (InterruptedException e) { 
       subscriber.onError(e); 
      } 
     } 
    }) 
    .subscribeOn(Schedulers.io()) // io and network operation 
    .observeOn(AndroidSchedulers.mainThread()) // UI view update operation 
    .subscribe(new Observer<Long>() { 
     @Override 
     public void onCompleted() { 
      Log.d(TAG, "onCompleted -> " + Thread.currentThread().getName()); 
     } 

     @Override 
     public void onError(Throwable e) { 
      Log.d(TAG, "onError -> " + e.getMessage()); 
     } 

     @Override 
     public void onNext(Long progress) { 
      Log.d(TAG, "onNext -> " + Thread.currentThread().getName()); 
      Log.d(TAG, "onNext progress -> " + progress); 
      // here update view in ui thread 
     } 
    } 
    } 

А вот текст ошибки:

java.io.InterruptedIOException: thread interrupted 
    at okio.Timeout.throwIfReached(Timeout.java:145) 
    at okio.Okio$2.read(Okio.java:136) 
    at okio.AsyncTimeout$2.read(AsyncTimeout.java:211) 
    at okio.RealBufferedSource.read(RealBufferedSource.java:50) 
    at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:418) 
    at okio.RealBufferedSource$1.read(RealBufferedSource.java:371) 
    at java.io.InputStream.read(InputStream.java:163) 
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:74) 
    at com.eldorado.rxfiledownloaddemo.presenter.Presenter$1.call(Presenter.java:52) 
    at rx.Observable.unsafeSubscribe(Observable.java:8098) 
    at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62) 
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executor at java.util.concurrent.FutureTask.run(FutureTask.java:23 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573) 
    at java.lang.Thread.run(Thread.java:841) 
+0

Я думаю, что может быть ошибка в вашей реализации наблюдаемой, но я не уверен, где именно без отладки полного кода. Согласно вашему StackTrace, что-то не так в строке 74 или 52 в вашем файле Presenter.java. Может быть, попробуйте использовать Retrofit (https://github.com/square/retrofit) для HTTP-соединений вместо того, чтобы использовать для этого собственный код? Здесь легко ошибиться. –

ответ

0

observerOn это применить к Observable.create но internaly вы создаете новый наблюдаемым в другом потоке. Таким образом, ваш конвейер никогда не дает монитору основной поток. Я думаю, что ваш код слишком сложный для того, чего вы хотите достичь.

Только в том случае, поможет вам понять концепции планировщика

https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java