2017-01-11 5 views
0

У меня есть приложение в моем приложении для загрузки двух ZIP-файлов с сервера. Для этого я использовал retrofit + rxjava (создал две отдельные услуги по переоснащению). Для параллельного выполнения I подписывается как переоснащение службы в новом потоке, так и позднее ее объединение с использованием zip-оператора. Он работает нормально. Но позже я добавил оператора карты как к службе для распаковки, но не выполнению кода, написанного на операторе карты, и управление переходит непосредственно к операции zip. Я не понимаю, как это сделать, и я новичок в реактивном мире.Android Retrofit проблема с параллельной загрузкой файлов

То, что я пытался до сих пор

Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA, 
      params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread()); 
    dFileObservable.map(new Function<Response<ResponseBody>, String>() { 
     @Override 
     public String apply(Response<ResponseBody> responseBody) throws Exception { 
      String header = responseBody.headers().get("Content-Disposition"); 
      String filename = header.replace("attachment; filename=", ""); 
      String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath(); 
      String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME); 
      fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source()); 
      String dealerFilePath = dealerZipPath+File.separator+filename; 
      unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME); 
      return dealerFilePath; 
     } 
    }); 

    Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA, 
      params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());; 
    generalFileObservable.map(new Function<Response<ResponseBody>, String>() { 
     @Override 
     public String apply(Response<ResponseBody> responseBody) throws Exception { 
      String header = responseBody.headers().get("Content-Disposition"); 
      String filename = header.replace("attachment; filename=", ""); 
      String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath(); 
      String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME); 
      fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source()); 
      String generalFilePath = generalZipPath+File.separator+filename; 
      unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME); 
      return generalFilePath; 
     } 
    }); 

    Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() { 
     @Override 
     public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception { 
      System.out.println("zipped yess"); 
      return null; 
     } 
    }).observeOn(Schedulers.io()); 

    zipped.subscribe(getObserver()); 

и функцию getObserver()

private Observer<String> getObserver(){ 

    return new Observer<String>() { 
     @Override 
     public void onSubscribe(Disposable d) { 

     } 

     @Override 
     public void onNext(String value) { 

      System.out.println("------------total time-----------"); 
      System.out.println("result value-->"+value); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }; 
} 

Когда код выполняет управление передается функции применяются() в почтовый оператор и оператор карты в обоих наблюдаемых не выполняется.

И есть еще один вопрос

Я слияние/сжать два наблюдаемые и тип передается оператору Response < «ResponseBody»>. На самом деле мне нужен загруженный путь к файлу (тип строки), и для этого мне нужно что-то делать?

**

Обновленный решение, как описано @Yaroslav Stavnichiy и теперь его работы

**

Observable<String> deObservable = dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA, 
      params.getNumber(),params.getId(),params.getCtId(), params.getSessionId()) 
      .flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() { 
       @Override 
       public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception { 
        String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME, 
          StrConstants.FILE_NAME,responseBody.body().source()); 
        return Observable.just(zipPath); 
       } 
      }).map(new Function<String, String>() { 
       @Override 
       public String apply(String filePath) throws Exception { 
        String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+ 
          File.separator+ StrConstants.FOLDER_NAME; 
        unzipUtility.unzip(filePath, unzipDestinationPath); 
        return unzipDestinationPath; 
       } 
      }).subscribeOn(Schedulers.newThread()); 

ответ

2

То, что вы эффективно делать это:

Observable a = ...; 
Observable b = ...; 
a.map(...); 
b.map(...); 
Observable.zip(a, b).subscribe(f); 

map() (как и все другие rx-операторы) не мутирует источник. Он возвращает новые наблюдаемые данные, которые можно использовать в дальнейших вычислениях. В вашем коде вы игнорируете возвращенные объекты. Вы застегиваете оригинальные наблюдаемые, а не сопоставленные, поэтому функции mapper не вызываются.

Я думаю, что вы хотели бы сделать следующее:

Observable a = ... .map(...); 
Observable b = ... .map(...); 
Observable.zip(a, b).subscribe(f); 
+0

Вы имеете в виду, чтобы написать определение в декларации времени (например, написание карты и все оператора) – Sunny

+1

@Sunny я имею в виду, чтобы не потерять возвращаемое значение ' map() 'call, потому что это наблюдаемое, которое вы действительно хотите почерпнуть. Вы в настоящее время зажимаете оригинальные наблюдаемые объекты, игнорируя результат 'map()'. –

+0

ОК теперь я получил его. попробуем и дадим вам знать .. – Sunny

Смежные вопросы