0

Я с трудом пытаюсь выяснить причину, я продолжаю видеть:Не удается выяснить причину StaleObjectStateException

`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)` 

У меня есть служба, которая использует Quartz Scheduler уволить рабочие места, в в моем контексте эти задания называются Flows, и каждый поток может состоять из нескольких Tasks, потоки и задачи - Executables, а информация об их действии Executions хранится как FlowExecutions и TaskExecutions. Для запуска потоков служба использует FlowService.

UPD: Работа Кварца «Исполнитель» несет ответственность за стрельбу по моим потокам/задачам. Когда он запускается, он использует FlowService для запуска всего, что он должен. Поэтому мне интересно, возможно ли, что кварцевый поток не создает новые сеансы hibernate каждый раз, когда он использует службу, и это является причиной проблемы. Я не изменил рамки FlowService, так что это синглтон, как GORM управляет сеансом, используемым им?

UPD2: Пробовал использовать persistenceContextInterceptor на ExecutorJob, чтобы убедиться, что каждое использование службы использует новый сеанс, но это не решило проблему. Добавлен упрощенный код для ExecutorJob.

Я не мог воспроизвести проблему локально, но это происходит часто при производстве, а точнее, когда есть много потоков для инициирования. Я пробовал синхронизировать методы для задач и потоков, но это не сработало, теперь я попытаюсь использовать пессимистическую блокировку, но я предполагаю, что она не решит проблему, поскольку проверка журналов приложений кажется такой не являются двумя потоками, обновляющими одну и ту же строку. После того, как я попытался показать упрощенную версию кода, имитирующую структуру проекта.

// ------------------ 
// DOMAIN CLASSES 
// ------------------ 
abstract class Executable { 
    static hasMany = [flowTasks: FlowTask] 
    static transients = ['executions'] 

    List<Execution> getExecutions() { 
     this.id ? Execution.findAllByExecutable(this) : [] 
    } 

    void addToExecutions(Execution execution) { 
     execution.executable = this 
     execution.save() 
    } 

    abstract List<Execution> execute(Map params) 
} 

class Flow extends Executable { 
    SortedSet<FlowTask> tasks 
    static hasMany = [tasks: FlowTask] 

    private static final Object lockExecute = new Object() 
    private static final Object lockExecuteTask = new Object() 

    List<FlowExecution> execute(Map params) { 
     synchronized (lockExecute) { 
      List<Map> multiParams = multiplyParams(params) 
      multiParams.collect { Map param -> 
       FlowExecution flowExecution = new FlowExecution() 
       addToExecutions(flowExecution) 
       flowExecution.save() 
       this.attach() 
       save() 
       executeTasks(firstTasks(param), flowExecution, param) 
      } 
     } 
    } 

    List<Map> multiplyParams(Map params) { 
     // creates a list of params for the executions that must be started 
     [params] 
    } 

    Set<FlowTask> firstTasks(Map params) { 
     // finds the first tasks to be executed for the flow 
     tasks.findAdll { true } 
    } 

    private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) { 
     synchronized (lockExecuteTask) { 
      tasks.each { FlowTask flowTask -> 
       try { 
        List<Execution> executions = flowTask.execute(params) 
        executions.each { Execution execution -> 
         flowExecution.addToExecutions(execution) 
        } 
        flowExecution.attach() 
       } catch { 
        // log error executing task 
        throw e 
       }    
      } 

      this.attach() 
      try { 
       save(flush: true) 
      } catch (HibernateOptimisticLockingFailureException e) { 
       // log error saving flow 
       throw e 
      } 

      flowExecution 
     } 
    } 

} 

class Task extends Executable { 
    private static final Object lockExecute = new Object() 
    private static final Object lockGetExecution = new Object() 

    TaskExecution execute(TaskExecution execution) { 
     taskService.start(execution) 
     execution 
    } 

    List<TaskExecution> execute(Map params) { 
     synchronized (lockExecute) { 
      List<Map> multiExecParams = multiplyParams(params) 
      multiExecParams.collect { Map param -> 
       TaskExecution execution = getExecution(param) 
       execute(execution) 
      } 
     } 
    } 

    TaskExecution getExecution(Map params) { 
     synchronized (lockGetExecution) { 
      TaskExecution execution = new TaskExecution(executable: this) 
      execution.setParameters(params) 
      addToExecutions(execution) 

      execution.attach() 
      execution.flowExecution?.attach() 
      this.attach() 
      try { 
       save(flush: true) 
      } catch (HibernateOptimisticLockingFailureException e) { 
       // log error saving task 
       throw e 
      } 

      execution 
     } 
    } 

    List<Map> multiplyParams(Map params) { 
     // creates a list of params for the tasks that must be started 
     [params] 
    } 

} 

class FlowTask { 
    static belongsTo = [flow: Flow, executable: Executable] 

    List<Execution> execute(Map params) { 
     executable.execute(params) 
    } 
} 

abstract class Execution { 
    Map parameterData = [:] 
    static belongsTo = [executable: Executable, flowExecution: FlowExecution] 
    static transients = ['parameters', 'taskExecutions'] 
    void setParameters(Map params) { 
     params.each { key, value -> 
      parameterData[key] = JsonParser.toJson(value) 
     } 
    } 
} 

class TaskExecution extends Execution { 
} 

class FlowExecution extends Execution { 
    List<Execution> executions 
    static transients = ['executions'] 

    FlowExecution() { 
     executions = [] 
    } 

    Set<TaskExecution> getTaskExecutions() { 
     executions?.collect { Execution execution -> 
      return execution.taskExecution 
     }?.flatten()?.toSet() 
    } 

    void addToExecutions(Execution execution){ 
     executions.add(execution) 
     execution.flowExecution = this 
     execution.save() 
    } 

    def onLoad() { 
     try { 
      executions = this.id ? Execution.findAllByFlowExecution(this) : [] 
     } catch (Exception e){ 
      log.error(e) 
      [] 
     } 
    } 
} 

// ----------------- 
// SERVICE CLASSES 
// ----------------- 
class FlowService { 

    Map start(long flowId, Map params) { 
     Flow flow = Flow.lock(flowId) 

     startFlow(flow, params) 
    } 

    private Map startFlow(Flow flow, Map params) { 
     List<RunningFlow> runningFlows = flow.execute(params) 

     [data: [success: true], status: HTTP_OK] 
    } 
} 

//-------------------------------------- 
// Quartz job 
//-------------------------------------- 
class ExecutorJob implements InterruptableJob { 

    def grailsApplication = Holders.getGrailsApplication() 

    static triggers = {} 

    private Thread thread 

    void execute(JobExecutionContext context) throws JobExecutionException { 
     thread = Thread.currentThread() 
     synchronized (LockContainer.taskLock) { 
      Map params = context.mergedJobDataMap 
      def persistenceInterceptor = persistenceInterceptorInstance 

      try { 
       persistenceInterceptor.init() 

       Long executableId = params.executableId as Long 

       def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance 
       service.start(executableId, params) 
      } catch (Exception e) { 
       // log error 
      } finally { 
       persistenceInterceptor.flush() 
       persistenceInterceptor.destroy() 
      } 
     } 
    } 

    PersistenceContextInterceptor getPersistenceInterceptorInstance() { 
     grailsApplication.mainContext.getBean('persistenceInterceptor') 
    } 

    FluxoService getFlowServiceInstance() { 
     grailsApplication.mainContext.getBean('flowService') 
    } 

    TarefaService getTaskServiceInstance() { 
     grailsApplication.mainContext.getBean('taskService') 
    } 

    @Override 
    void interrupt() throws UnableToInterruptJobException { 
     thread?.interrupt() 
    }  
} 

Кто-нибудь знает что-то, что может помочь?

+0

Где код, который извлекает экземпляр потока и вызывает Flow.execute()? –

+0

Это метод запуска FlowService. –

+0

Отлично. Это код, который мне нужно увидеть. –

ответ

0

Ну, Трудно понять, что происходит не так. Однако, я думаю, эта ошибка возникает, когда у вас есть объект в сеансе, который уже был сохранен или обновлен какой-либо другой транзакцией. Опять же, когда hibernate пытается сохранить этот объект, он дает Ряд был обновлен error другой транзакцией Ошибка.

Я думаю, вы можете попробовать обновить, прежде чем сохранять свой объект и посмотреть, как он идет.

http://grails.github.io/grails-doc/2.3.4/ref/Domain%20Classes/refresh.html

def b = Book.get(1) 
… 
b.refresh()