camel-test3-17/camel-jdbc-yml/UserProcessor.java

309 lines
12 KiB
Java
Raw Normal View History

2025-03-13 21:19:48 +08:00
package com.example.processor;
import com.example.model.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Configuration;
import org.apache.camel.BindToRegistry;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@BindToRegistry("userProcessor")
public class UserProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(UserProcessor.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void process(Exchange exchange) throws Exception {
String operation = exchange.getIn().getHeader("operation", String.class);
LOG.info("处理用户操作: {}", operation);
switch (operation) {
case "transformUserInput":
transformUserInput(exchange);
break;
case "prepareUserResponse":
prepareUserResponse(exchange);
break;
case "prepareUsersResponse":
prepareUsersResponse(exchange);
break;
case "preparePagedUsersResponse":
preparePagedUsersResponse(exchange);
break;
case "preparePageParams":
preparePageParams(exchange);
break;
case "prepareAdvancedSearchParams":
prepareAdvancedSearchParams(exchange);
break;
case "prepareErrorResponse":
prepareErrorResponse(exchange);
break;
case "prepareCreateUserParams":
prepareCreateUserParams(exchange);
break;
case "prepareUpdateUserParams":
prepareUpdateUserParams(exchange);
break;
default:
LOG.warn("未知操作: {}", operation);
exchange.getIn().setBody("未知操作");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
}
}
private void transformUserInput(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
User user = objectMapper.readValue(body, User.class);
exchange.getIn().setBody(user);
}
private void prepareUserResponse(Exchange exchange) throws Exception {
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null || resultList.isEmpty()) {
exchange.getIn().setBody("{\"message\": \"用户不存在\"}");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 404);
return;
}
Map<String, Object> result = resultList.get(0);
User user = mapToUser(result);
exchange.getIn().setBody(user);
}
private void prepareUsersResponse(Exchange exchange) throws Exception {
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null) {
resultList = new ArrayList<>();
}
List<User> users = new ArrayList<>();
for (Map<String, Object> result : resultList) {
users.add(mapToUser(result));
}
LOG.info("进入处理数据过程方法获取到的list: {}", users);
exchange.getIn().setBody(users);
}
private void preparePagedUsersResponse(Exchange exchange) throws Exception {
// 获取用户数据
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null) {
resultList = new ArrayList<>();
}
// 转换用户对象
List<User> users = new ArrayList<>();
for (Map<String, Object> result : resultList) {
users.add(mapToUser(result));
}
// 获取总记录数
Long total = exchange.getProperty("totalCount", Long.class);
if (total == null) {
total = 0L;
}
// 获取分页参数
int page = exchange.getIn().getHeader("page", Integer.class);
int size = exchange.getIn().getHeader("size", Integer.class);
// 构建分页响应对象
Map<String, Object> response = new HashMap<>();
response.put("content", users);
response.put("totalElements", total);
response.put("totalPages", (int) Math.ceil((double) total / size));
response.put("page", page);
response.put("size", size);
response.put("numberOfElements", users.size());
exchange.getIn().setBody(response);
}
private void preparePageParams(Exchange exchange) {
// 获取页码和每页大小参数默认值为第1页每页10条记录
String pageParam = exchange.getIn().getHeader("page", "1", String.class);
String sizeParam = exchange.getIn().getHeader("size", "10", String.class);
int page = Integer.parseInt(pageParam);
int size = Integer.parseInt(sizeParam);
// 确保页码和每页大小有效
page = Math.max(1, page);
size = Math.max(1, Math.min(100, size)); // 限制每页最多100条记录
// 计算偏移量
int offset = (page - 1) * size;
// 将参数设置到消息头中
exchange.getIn().setHeader("page", page);
exchange.getIn().setHeader("size", size);
exchange.getIn().setHeader("offset", offset);
exchange.getIn().setHeader("limit", size);
// 创建参数Map用于SQL查询
Map<String, Object> params = new HashMap<>();
params.put("limit", size);
params.put("offset", offset);
exchange.getIn().setBody(params);
LOG.info("分页参数: page={}, size={}, offset={}", page, size, offset);
}
private void prepareAdvancedSearchParams(Exchange exchange) {
// 获取分页参数
String pageParam = exchange.getIn().getHeader("page", "1", String.class);
String sizeParam = exchange.getIn().getHeader("size", "10", String.class);
int page = Integer.parseInt(pageParam);
int size = Integer.parseInt(sizeParam);
// 确保页码和每页大小有效
page = Math.max(1, page);
size = Math.max(1, Math.min(100, size)); // 限制每页最多100条记录
// 计算偏移量
int offset = (page - 1) * size;
// 获取查询参数
String username = exchange.getIn().getHeader("username", "", String.class);
String email = exchange.getIn().getHeader("email", "", String.class);
String phone = exchange.getIn().getHeader("phone", "", String.class);
String activeStr = exchange.getIn().getHeader("active", "", String.class);
String fromDate = exchange.getIn().getHeader("fromDate", "", String.class);
String toDate = exchange.getIn().getHeader("toDate", "", String.class);
// 获取排序参数
String sortField = exchange.getIn().getHeader("sortField", "", String.class);
String sortOrder = exchange.getIn().getHeader("sortOrder", "DESC", String.class);
// 验证排序字段
if (!sortField.isEmpty() && !isValidSortField(sortField)) {
sortField = ""; // 无效的排序字段,使用默认排序字段(id)
}
// 验证排序顺序
if (!sortOrder.equals("ASC") && !sortOrder.equals("DESC")) {
sortOrder = "DESC"; // 无效的排序顺序,使用默认排序顺序
}
// 解析布尔值
Boolean active = null;
if (!activeStr.isEmpty()) {
active = Boolean.parseBoolean(activeStr);
}
// 将参数设置到消息头中
exchange.getIn().setHeader("page", page);
exchange.getIn().setHeader("size", size);
exchange.getIn().setHeader("offset", offset);
exchange.getIn().setHeader("limit", size);
exchange.getIn().setHeader("active", active);
exchange.getIn().setHeader("sortField", sortField);
exchange.getIn().setHeader("sortOrder", sortOrder);
// 创建参数Map用于SQL查询
Map<String, Object> params = new HashMap<>();
params.put("username", username);
params.put("email", email);
params.put("phone", phone);
params.put("active", active);
params.put("fromDate", fromDate);
params.put("toDate", toDate);
params.put("sortField", sortField);
params.put("sortOrder", sortOrder);
params.put("limit", size);
params.put("offset", offset);
exchange.getIn().setBody(params);
LOG.info("高级查询参数: page={}, size={}, username={}, email={}, active={}, sortField={}, sortOrder={}",
page, size, username, email, active, sortField, sortOrder);
}
private boolean isValidSortField(String field) {
return field.equals("username") || field.equals("email") || field.equals("createTime");
}
private void prepareErrorResponse(Exchange exchange) {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String errorMessage = exception != null ? exception.getMessage() : "未知错误";
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("error", true);
errorResponse.put("message", errorMessage);
exchange.getIn().setBody(errorResponse);
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
}
private void prepareCreateUserParams(Exchange exchange) {
User user = exchange.getIn().getBody(User.class);
Map<String, Object> params = new HashMap<>();
params.put("username", user.getUsername());
params.put("email", user.getEmail());
params.put("password", user.getPassword());
params.put("fullName", user.getFullName());
params.put("phone", user.getPhone());
params.put("active", user.isActive());
exchange.getIn().setHeader("username", user.getUsername());
exchange.getIn().setHeader("email", user.getEmail());
exchange.getIn().setHeader("password", user.getPassword());
exchange.getIn().setHeader("fullName", user.getFullName());
exchange.getIn().setHeader("phone", user.getPhone());
exchange.getIn().setHeader("active", user.isActive());
exchange.getIn().setBody(params);
}
private void prepareUpdateUserParams(Exchange exchange) {
User user = exchange.getIn().getBody(User.class);
String userId = exchange.getIn().getHeader("userId", String.class);
Map<String, Object> params = new HashMap<>();
params.put("id", Long.parseLong(userId));
params.put("username", user.getUsername());
params.put("email", user.getEmail());
params.put("password", user.getPassword());
params.put("fullName", user.getFullName());
params.put("phone", user.getPhone());
params.put("active", user.isActive());
exchange.getIn().setHeader("id", Long.parseLong(userId));
exchange.getIn().setHeader("username", user.getUsername());
exchange.getIn().setHeader("email", user.getEmail());
exchange.getIn().setHeader("password", user.getPassword());
exchange.getIn().setHeader("fullName", user.getFullName());
exchange.getIn().setHeader("phone", user.getPhone());
exchange.getIn().setHeader("active", user.isActive());
exchange.getIn().setBody(params);
}
private User mapToUser(Map<String, Object> result) {
User user = new User();
user.setId(((Number) result.get("id")).longValue());
user.setUsername((String) result.get("username"));
user.setEmail((String) result.get("email"));
user.setPassword(null); // 出于安全考虑,不返回密码
user.setFullName((String) result.get("full_name"));
user.setPhone((String) result.get("phone"));
user.setCreateTime((java.sql.Timestamp) result.get("create_time"));
user.setUpdateTime((java.sql.Timestamp) result.get("update_time"));
user.setActive((Boolean) result.get("active"));
return user;
}
}