Алгоритм Spath не реализован на Python, насколько я знаю.
Но вы могли бы повторить свои результаты с помощью гауссовых смесей в scikit учиться:
import numpy as np
from sklearn.mixture import GaussianMixture
import matplotlib.pyplot as plt
# generate random data
np.random.seed(1)
n = 10
x1 = np.random.uniform(0, 20, size=n)
x2 = np.random.uniform(0, 20, size=n)
y1 = x1 + np.random.normal(size=n)
y2 = 15 - x2 + np.random.normal(size=n)
x = np.concatenate([x1, x2])
y = np.concatenate([y1, y2])
data = np.vstack([x, y]).T
model = GaussianMixture (n_components=2).fit(data)
plt.scatter(x, y, c=model.predict(data))
plt.show()
Этот код производит картину, аналогичную той, в статье:
ГИМ отличается от алгоритма Spath, поскольку первый пытается максимизировать точность прогноза ВСЕХ данных (X и y), а последний максимизирует только R^2 у. На мой взгляд, для большинства практических проблем вы предпочли бы GMM.
Если вы все еще хотите алгоритм Шпет, это может быть сделано с классом, как это, реализует вариант алгоритма EM:
import numpy as np
from sklearn.linear_model import Ridge
from sklearn.base import RegressorMixin, BaseEstimator, clone
class ClusteredRegressor(RegressorMixin, BaseEstimator):
def __init__(self, n_components=2, base=Ridge(), random_state=1, max_iter=100, tol=1e-10, verbose=False):
self.n_components = n_components
self.base = base
self.random_state = random_state
self.max_iter = max_iter
self.tol = tol
self.verbose = verbose
def fit(self, X, y):
np.random.seed(self.random_state)
self.estimators_ = [clone(self.base) for i in range(self.n_components)]
# initialize cluster responsibilities randomly
self.resp_ = np.random.uniform(size=(X.shape[0], self.n_components))
self.resp_ /= self.resp_.sum(axis=1, keepdims=True)
for it in range(self.max_iter):
old_resp = self.resp_.copy()
# Estimate sample-weithted regressions
errors = np.empty(shape=self.resp_.shape)
for i, est in enumerate(self.estimators_):
est.fit(X, y, sample_weight=self.resp_[:, i])
errors[:, i] = y - est.predict(X)
self.mse_ = np.sum(self.resp_ * errors**2)/X.shape[0]
if self.verbose:
print(self.mse_)
# Recalculate responsibilities
self.resp_ = np.exp(-errors**2/self.mse_)
self.resp_ /= self.resp_.sum(axis=1, keepdims=True)
# stop if change in responsibilites is small
delta = np.abs(self.resp_ - old_resp).mean()
if delta < self.tol:
break
self.n_iter_ = it
return self
def predict(self, X):
""" Calculate a matrix of conditional predictions """
return np.vstack([est.predict(X) for est in self.estimators_]).T
def predict_proba(self, X, y):
""" Estimate cluster probabilities of labeled data """
predictions = self.predict(X)
errors = np.empty(shape=self.resp_.shape)
for i, est in enumerate(self.estimators_):
errors[:, i] = y - est.predict(X)
resp_ = np.exp(-errors**2/self.mse_)
resp_ /= resp_.sum(axis=1, keepdims=True)
return resp_
Этот код аналогичен алгоритму SPATH, с той лишь разницей, что использует мягкие «обязанности» каждого кластера для каждого наблюдения, вместо жесткого назначения кластера (таким образом, это проще для оптимизации). Вы можете видеть, что в результате присваивание кластера похоже на GMM:
model = ClusteredRegressor()
model.fit(x[:, np.newaxis], y)
labels = np.argmax(model.resp_, axis=1)
plt.scatter(x, y, c=labels)
plt.show()
К сожалению, эта модель не может быть применена для прогнозирования данных испытаний, потому что его производительность зависит от меток данных (y
). Однако, если вы дополнительно измените мой код, вы можете предсказать вероятность кластера на X
. В этом случае модель будет полезна для прогнозирования.
Нашли ответ на этот вопрос или смогли ли вы его реализовать? – akoshodi