AutoML Pipeline Setup Expert
Эксперт по проектированию и реализации автоматизированных систем машинного обучения.
Архитектура пайплайна
Модульные компоненты
Data Ingestion → Validation → Feature Engineering → Model Training → Evaluation → Deployment
Конфигурация через YAML
pipeline: name: customer_churn_prediction version: "1.0"
data: source: "s3://bucket/data.parquet" validation: null_threshold: 0.1 duplicate_check: true
features: numerical: - age - tenure - monthly_charges categorical: - contract_type - payment_method target: churn
automl: framework: h2o max_runtime_secs: 3600 max_models: 20 stopping_metric: AUC sort_metric: AUC
deployment: platform: mlflow model_registry: true
Data Validation с Great Expectations
import great_expectations as gx
def validate_data(df, expectation_suite_name="default"): context = gx.get_context()
# Создание expectation suite
suite = context.add_expectation_suite(expectation_suite_name)
# Определение expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name
)
# Проверки качества данных
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_between("age", min_value=18, max_value=100)
validator.expect_column_values_to_be_in_set(
"contract_type",
["month-to-month", "one_year", "two_year"]
)
# Валидация
results = validator.validate()
return results.success
Feature Engineering Pipeline
from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.impute import SimpleImputer from feature_engine.creation import CyclicalFeatures from feature_engine.selection import DropCorrelatedFeatures
def create_feature_pipeline(numerical_cols, categorical_cols): numerical_transformer = Pipeline([ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', numerical_transformer, numerical_cols),
('cat', categorical_transformer, categorical_cols)
])
feature_pipeline = Pipeline([
('preprocessor', preprocessor),
('drop_correlated', DropCorrelatedFeatures(threshold=0.95))
])
return feature_pipeline
H2O AutoML
import h2o from h2o.automl import H2OAutoML
def train_automl_model(train_df, target_col, config): h2o.init()
# Конвертация в H2O Frame
h2o_train = h2o.H2OFrame(train_df)
# Определение типов колонок
h2o_train[target_col] = h2o_train[target_col].asfactor()
# Предикторы
predictors = [col for col in h2o_train.columns if col != target_col]
# AutoML
aml = H2OAutoML(
max_runtime_secs=config.get('max_runtime_secs', 3600),
max_models=config.get('max_models', 20),
stopping_metric=config.get('stopping_metric', 'AUC'),
sort_metric=config.get('sort_metric', 'AUC'),
seed=42,
exclude_algos=['DeepLearning'], # Опционально исключить алгоритмы
nfolds=5
)
aml.train(
x=predictors,
y=target_col,
training_frame=h2o_train
)
# Лидерборд
leaderboard = aml.leaderboard.as_data_frame()
print(leaderboard)
return aml.leader
MLflow Experiment Tracking
import mlflow from mlflow.tracking import MlflowClient
class ExperimentTracker: def init(self, experiment_name): mlflow.set_experiment(experiment_name) self.client = MlflowClient()
def log_automl_run(self, model, metrics, params, artifacts_path=None):
with mlflow.start_run():
# Логирование параметров
for key, value in params.items():
mlflow.log_param(key, value)
# Логирование метрик
for key, value in metrics.items():
mlflow.log_metric(key, value)
# Логирование модели
mlflow.h2o.log_model(model, "model")
# Логирование артефактов
if artifacts_path:
mlflow.log_artifacts(artifacts_path)
run_id = mlflow.active_run().info.run_id
return run_id
def register_best_model(self, run_id, model_name):
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, model_name)
Optuna для Hyperparameter Tuning
import optuna from sklearn.model_selection import cross_val_score from sklearn.ensemble import RandomForestClassifier
def objective(trial, X, y): params = { 'n_estimators': trial.suggest_int('n_estimators', 50, 500), 'max_depth': trial.suggest_int('max_depth', 3, 20), 'min_samples_split': trial.suggest_int('min_samples_split', 2, 20), 'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10), 'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None]) }
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
scores = cross_val_score(model, X, y, cv=5, scoring='roc_auc')
return scores.mean()
def run_optimization(X, y, n_trials=100): study = optuna.create_study(direction='maximize') study.optimize(lambda trial: objective(trial, X, y), n_trials=n_trials)
print(f"Best trial: {study.best_trial.value}")
print(f"Best params: {study.best_params}")
return study.best_params
Airflow DAG для оркестрации
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta
default_args = { 'owner': 'ml-team', 'depends_on_past': False, 'email_on_failure': True, 'retries': 1, 'retry_delay': timedelta(minutes=5) }
dag = DAG( 'automl_pipeline', default_args=default_args, description='AutoML Training Pipeline', schedule_interval='@daily', start_date=datetime(2024, 1, 1), catchup=False )
validate_data_task = PythonOperator( task_id='validate_data', python_callable=validate_data, dag=dag )
feature_engineering_task = PythonOperator( task_id='feature_engineering', python_callable=run_feature_engineering, dag=dag )
automl_training_task = PythonOperator( task_id='automl_training', python_callable=train_automl_model, dag=dag )
model_validation_task = PythonOperator( task_id='model_validation', python_callable=validate_model, dag=dag )
validate_data_task >> feature_engineering_task >> automl_training_task >> model_validation_task
Рекомендации по фреймворкам
Сценарий Рекомендация
Enterprise, табличные данные H2O.ai
Cloud-native Google Vertex AI, AWS SageMaker
Быстрое прототипирование AutoGluon, FLAML
Кастомизация MLflow + Optuna
Deep Learning AutoKeras, Neural Architecture Search
Лучшие практики
-
Data sampling — для ускорения экспериментов
-
Early stopping — прекращение неперспективных моделей
-
Resource management — лимиты памяти и CPU
-
Distributed training — Ray Tune, Dask для масштабирования
-
Model versioning — отслеживание всех экспериментов
-
Reproducibility — фиксация random seeds