在 CentOS 环境中,HDFS 常作为机器学习的数据湖底座,承担海量训练数据、特征与模型的统一存储与高吞吐供给,支撑从离线训练到在线推理的全流程。
/mlworkspace/
├── datasets/ # 原始数据(如 imagenet、tabular)
├── preprocessed/ # 特征工程结果
├── models/ # 模型文件(v1、v2…)
└── experiments/ # 实验中间结果与日志
hdfs dfs -mkdir -p /mlworkspace/{datasets,preprocessed,models,experiments};hdfs dfs -chmod -R 755 /mlworkspacehadoop distcp file:///local/training_data hdfs://namenode:9000/mlworkspace/datasets/hdfs dfs -mv /mlworkspace/datasets/imagenet /mlworkspace/datasets/imagenet_20250415_v1SparkSession.builder.master("yarn") 读取 HDFS 数据,完成特征工程、训练与评估,并配合 MLflow 做指标与模型版本跟踪。agent1.sources = source1
agent1.channels = channel1
agent1.sinks = sink1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /var/log/user_behavior.log
agent1.sources.source1.channels = channel1
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 100000
agent1.channels.channel1.transactionCapacity = 1000
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://namenode:9000/user/data/user_behavior/%Y-%m-%d
agent1.sinks.sink1.hdfs.filePrefix = user_behavior-
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.channels = channel1
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
spark = SparkSession.builder.appName("ALSRec").master("yarn").getOrCreate()
df = spark.read.parquet("/user/data/user_behavior")
als = ALS(userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(train)
preds = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(preds)