Я с трудом пытаюсь выяснить причину, я продолжаю видеть:Не удается выяснить причину StaleObjectStateException
`HibernateOptimisticLockingFailureException: FlowExecution: optimistic locking failed; nested exception is org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect)`
У меня есть служба, которая использует Quartz Scheduler уволить рабочие места, в в моем контексте эти задания называются Flows
, и каждый поток может состоять из нескольких Tasks
, потоки и задачи - Executables
, а информация об их действии Executions
хранится как FlowExecutions
и TaskExecutions
. Для запуска потоков служба использует FlowService
.
UPD: Работа Кварца «Исполнитель» несет ответственность за стрельбу по моим потокам/задачам. Когда он запускается, он использует FlowService для запуска всего, что он должен. Поэтому мне интересно, возможно ли, что кварцевый поток не создает новые сеансы hibernate каждый раз, когда он использует службу, и это является причиной проблемы. Я не изменил рамки FlowService, так что это синглтон, как GORM управляет сеансом, используемым им?
UPD2: Пробовал использовать persistenceContextInterceptor на ExecutorJob, чтобы убедиться, что каждое использование службы использует новый сеанс, но это не решило проблему. Добавлен упрощенный код для ExecutorJob.
Я не мог воспроизвести проблему локально, но это происходит часто при производстве, а точнее, когда есть много потоков для инициирования. Я пробовал синхронизировать методы для задач и потоков, но это не сработало, теперь я попытаюсь использовать пессимистическую блокировку, но я предполагаю, что она не решит проблему, поскольку проверка журналов приложений кажется такой не являются двумя потоками, обновляющими одну и ту же строку. После того, как я попытался показать упрощенную версию кода, имитирующую структуру проекта.
// ------------------
// DOMAIN CLASSES
// ------------------
abstract class Executable {
static hasMany = [flowTasks: FlowTask]
static transients = ['executions']
List<Execution> getExecutions() {
this.id ? Execution.findAllByExecutable(this) : []
}
void addToExecutions(Execution execution) {
execution.executable = this
execution.save()
}
abstract List<Execution> execute(Map params)
}
class Flow extends Executable {
SortedSet<FlowTask> tasks
static hasMany = [tasks: FlowTask]
private static final Object lockExecute = new Object()
private static final Object lockExecuteTask = new Object()
List<FlowExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiParams = multiplyParams(params)
multiParams.collect { Map param ->
FlowExecution flowExecution = new FlowExecution()
addToExecutions(flowExecution)
flowExecution.save()
this.attach()
save()
executeTasks(firstTasks(param), flowExecution, param)
}
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the executions that must be started
[params]
}
Set<FlowTask> firstTasks(Map params) {
// finds the first tasks to be executed for the flow
tasks.findAdll { true }
}
private FlowExecution executeTasks(Set<FlowTask> tasks, FlowExecution flowExecution, Map params) {
synchronized (lockExecuteTask) {
tasks.each { FlowTask flowTask ->
try {
List<Execution> executions = flowTask.execute(params)
executions.each { Execution execution ->
flowExecution.addToExecutions(execution)
}
flowExecution.attach()
} catch {
// log error executing task
throw e
}
}
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving flow
throw e
}
flowExecution
}
}
}
class Task extends Executable {
private static final Object lockExecute = new Object()
private static final Object lockGetExecution = new Object()
TaskExecution execute(TaskExecution execution) {
taskService.start(execution)
execution
}
List<TaskExecution> execute(Map params) {
synchronized (lockExecute) {
List<Map> multiExecParams = multiplyParams(params)
multiExecParams.collect { Map param ->
TaskExecution execution = getExecution(param)
execute(execution)
}
}
}
TaskExecution getExecution(Map params) {
synchronized (lockGetExecution) {
TaskExecution execution = new TaskExecution(executable: this)
execution.setParameters(params)
addToExecutions(execution)
execution.attach()
execution.flowExecution?.attach()
this.attach()
try {
save(flush: true)
} catch (HibernateOptimisticLockingFailureException e) {
// log error saving task
throw e
}
execution
}
}
List<Map> multiplyParams(Map params) {
// creates a list of params for the tasks that must be started
[params]
}
}
class FlowTask {
static belongsTo = [flow: Flow, executable: Executable]
List<Execution> execute(Map params) {
executable.execute(params)
}
}
abstract class Execution {
Map parameterData = [:]
static belongsTo = [executable: Executable, flowExecution: FlowExecution]
static transients = ['parameters', 'taskExecutions']
void setParameters(Map params) {
params.each { key, value ->
parameterData[key] = JsonParser.toJson(value)
}
}
}
class TaskExecution extends Execution {
}
class FlowExecution extends Execution {
List<Execution> executions
static transients = ['executions']
FlowExecution() {
executions = []
}
Set<TaskExecution> getTaskExecutions() {
executions?.collect { Execution execution ->
return execution.taskExecution
}?.flatten()?.toSet()
}
void addToExecutions(Execution execution){
executions.add(execution)
execution.flowExecution = this
execution.save()
}
def onLoad() {
try {
executions = this.id ? Execution.findAllByFlowExecution(this) : []
} catch (Exception e){
log.error(e)
[]
}
}
}
// -----------------
// SERVICE CLASSES
// -----------------
class FlowService {
Map start(long flowId, Map params) {
Flow flow = Flow.lock(flowId)
startFlow(flow, params)
}
private Map startFlow(Flow flow, Map params) {
List<RunningFlow> runningFlows = flow.execute(params)
[data: [success: true], status: HTTP_OK]
}
}
//--------------------------------------
// Quartz job
//--------------------------------------
class ExecutorJob implements InterruptableJob {
def grailsApplication = Holders.getGrailsApplication()
static triggers = {}
private Thread thread
void execute(JobExecutionContext context) throws JobExecutionException {
thread = Thread.currentThread()
synchronized (LockContainer.taskLock) {
Map params = context.mergedJobDataMap
def persistenceInterceptor = persistenceInterceptorInstance
try {
persistenceInterceptor.init()
Long executableId = params.executableId as Long
def service = (Executable.get(executableId) instanceof Flow) ? flowServiceInstance : taskServiceInstance
service.start(executableId, params)
} catch (Exception e) {
// log error
} finally {
persistenceInterceptor.flush()
persistenceInterceptor.destroy()
}
}
}
PersistenceContextInterceptor getPersistenceInterceptorInstance() {
grailsApplication.mainContext.getBean('persistenceInterceptor')
}
FluxoService getFlowServiceInstance() {
grailsApplication.mainContext.getBean('flowService')
}
TarefaService getTaskServiceInstance() {
grailsApplication.mainContext.getBean('taskService')
}
@Override
void interrupt() throws UnableToInterruptJobException {
thread?.interrupt()
}
}
Кто-нибудь знает что-то, что может помочь?
Где код, который извлекает экземпляр потока и вызывает Flow.execute()? –
Это метод запуска FlowService. –
Отлично. Это код, который мне нужно увидеть. –