Я пытаюсь выполнить одно и то же требование: у меня есть служба Java, которая должна выбрать лидера, и у меня нет проверок работоспособности, сконфигурированных в Consul.
Использование LeaderElectionUtil
у Консула-клиента проблематично, потому что, если все причины, указанные выше. К сожалению, также невозможно настроить LeaderElectionUtil
, потому что все его внутренние работы выполняются с использованием частных методов (он должен был использовать protected
и позволить пользователям переопределять создание сеанса - например).
Я пробовал реализовать «Сервисную регистрацию», как описано в «Основное использование - пример 1» в консул-клиенте README, но calling AgentClient.pass()
always throws an exception для меня.
Итак, мое решение - именно то, что вы указали, - выполните сеанс с TTL и обновите его до тех пор, пока служба жива.
Вот моя реализация, которая требует от пользователя и зарегистрировать функцию обратного вызова, который используется, чтобы проверить, если услуга остается в силе для обновления, только в случае, если:
public class SessionHolder implements Runnable {
private static final String TTL_TEMPLATE = "%ss";
private Consul client;
private String id;
private LinkedList<Supplier<Boolean>> liveChecks = new LinkedList<>();
private long ttl;
private boolean shutdown = false;
public SessionHolder(Consul client, String service, long ttl) {
this.client = client;
this.ttl = ttl;
final Session session = ImmutableSession.builder()
.name(service)
.ttl(String.format(TTL_TEMPLATE, ttl))
.build();
id = client.sessionClient().createSession(session).getId();
Thread upkeep = new Thread(this);
upkeep.setDaemon(true);
upkeep.start();
}
public String getId() {
return id;
}
public void registerKeepAlive(Supplier<Boolean> liveCheck) {
liveChecks.add(liveCheck);
}
@Override
public synchronized void run() {
// don't start renewing immediately
try {
wait(ttl/2 * 1000);
} catch (InterruptedException e) {}
while (!isShutdown()) {
if (liveChecks.isEmpty() || liveChecks.stream().allMatch(Supplier::get)) {
client.sessionClient().renewSession(getId());
}
try {
wait(ttl/2 * 1000);
} catch (InterruptedException e) {
// go on, try again
}
}
}
public synchronized boolean isShutdown() {
return shutdown;
}
public synchronized void close() {
shutdown = true;
notify();
client.sessionClient().destroySession(getId());
}
}
Тогда избрав лидером является более или менее так просто, как:
if (consul.keyValueClient().acquireLock(getServiceKey(service), currentNode, sessionHolder.getId()))
return true; // I'm the leader
Одна вещь, которая нуждается в помня, что если сеанс завершается без очистки надлежащим образом (что я выше в SessionHolder.close()
), то lock-delay
особенность консулом предотвратит новый лидер будет избран е или около 15 секунд (по умолчанию, который, к сожалению, Consul-клиент не предлагает API для изменения).
Чтобы решить эту проблему, кроме того, чтобы убедиться, что надлежащим образом завершающие службы очищаются после себя, как показано выше, я также должен обеспечить, чтобы сервис занимал позицию лидера за минимальное время, необходимое для освобождения руководства когда его больше не используют, вызывая consul.keyValueClient().releaseLock()
. Например, у меня есть кластерная служба, в которой мы выбираем лидера для чтения обновлений данных из внешней РСУБД (которые затем распределяются в кластере напрямую, а не каждый узел, перезагружая все данные). Поскольку это делается посредством опроса, каждый узел будет пытаться избираться до опроса, а если он будет избран, он будет опробовать базу данных, распространить обновление и уйти в отставку. Если после этого произойдет сбой, delay-lock
не предотвратит опрос другого узла.
Это сеанс, но в соответствии с документом, который вы указали выше, вы shoul d также попытайтесь приобрести замок на ключ. Позаботьтесь о том, чтобы поделиться соответствующим кодом для этого? – Guss