camel-test3-17/camel-jdbc-yml/UserProcessor.java

309 lines
12 KiB
Java
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.example.processor;
import com.example.model.User;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Configuration;
import org.apache.camel.BindToRegistry;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Configuration
@BindToRegistry("userProcessor")
public class UserProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(UserProcessor.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void process(Exchange exchange) throws Exception {
String operation = exchange.getIn().getHeader("operation", String.class);
LOG.info("处理用户操作: {}", operation);
switch (operation) {
case "transformUserInput":
transformUserInput(exchange);
break;
case "prepareUserResponse":
prepareUserResponse(exchange);
break;
case "prepareUsersResponse":
prepareUsersResponse(exchange);
break;
case "preparePagedUsersResponse":
preparePagedUsersResponse(exchange);
break;
case "preparePageParams":
preparePageParams(exchange);
break;
case "prepareAdvancedSearchParams":
prepareAdvancedSearchParams(exchange);
break;
case "prepareErrorResponse":
prepareErrorResponse(exchange);
break;
case "prepareCreateUserParams":
prepareCreateUserParams(exchange);
break;
case "prepareUpdateUserParams":
prepareUpdateUserParams(exchange);
break;
default:
LOG.warn("未知操作: {}", operation);
exchange.getIn().setBody("未知操作");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 400);
}
}
private void transformUserInput(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
User user = objectMapper.readValue(body, User.class);
exchange.getIn().setBody(user);
}
private void prepareUserResponse(Exchange exchange) throws Exception {
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null || resultList.isEmpty()) {
exchange.getIn().setBody("{\"message\": \"用户不存在\"}");
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 404);
return;
}
Map<String, Object> result = resultList.get(0);
User user = mapToUser(result);
exchange.getIn().setBody(user);
}
private void prepareUsersResponse(Exchange exchange) throws Exception {
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null) {
resultList = new ArrayList<>();
}
List<User> users = new ArrayList<>();
for (Map<String, Object> result : resultList) {
users.add(mapToUser(result));
}
LOG.info("进入处理数据过程方法获取到的list: {}", users);
exchange.getIn().setBody(users);
}
private void preparePagedUsersResponse(Exchange exchange) throws Exception {
// 获取用户数据
List<Map<String, Object>> resultList = exchange.getIn().getBody(List.class);
if (resultList == null) {
resultList = new ArrayList<>();
}
// 转换用户对象
List<User> users = new ArrayList<>();
for (Map<String, Object> result : resultList) {
users.add(mapToUser(result));
}
// 获取总记录数
Long total = exchange.getProperty("totalCount", Long.class);
if (total == null) {
total = 0L;
}
// 获取分页参数
int page = exchange.getIn().getHeader("page", Integer.class);
int size = exchange.getIn().getHeader("size", Integer.class);
// 构建分页响应对象
Map<String, Object> response = new HashMap<>();
response.put("content", users);
response.put("totalElements", total);
response.put("totalPages", (int) Math.ceil((double) total / size));
response.put("page", page);
response.put("size", size);
response.put("numberOfElements", users.size());
exchange.getIn().setBody(response);
}
private void preparePageParams(Exchange exchange) {
// 获取页码和每页大小参数默认值为第1页每页10条记录
String pageParam = exchange.getIn().getHeader("page", "1", String.class);
String sizeParam = exchange.getIn().getHeader("size", "10", String.class);
int page = Integer.parseInt(pageParam);
int size = Integer.parseInt(sizeParam);
// 确保页码和每页大小有效
page = Math.max(1, page);
size = Math.max(1, Math.min(100, size)); // 限制每页最多100条记录
// 计算偏移量
int offset = (page - 1) * size;
// 将参数设置到消息头中
exchange.getIn().setHeader("page", page);
exchange.getIn().setHeader("size", size);
exchange.getIn().setHeader("offset", offset);
exchange.getIn().setHeader("limit", size);
// 创建参数Map用于SQL查询
Map<String, Object> params = new HashMap<>();
params.put("limit", size);
params.put("offset", offset);
exchange.getIn().setBody(params);
LOG.info("分页参数: page={}, size={}, offset={}", page, size, offset);
}
private void prepareAdvancedSearchParams(Exchange exchange) {
// 获取分页参数
String pageParam = exchange.getIn().getHeader("page", "1", String.class);
String sizeParam = exchange.getIn().getHeader("size", "10", String.class);
int page = Integer.parseInt(pageParam);
int size = Integer.parseInt(sizeParam);
// 确保页码和每页大小有效
page = Math.max(1, page);
size = Math.max(1, Math.min(100, size)); // 限制每页最多100条记录
// 计算偏移量
int offset = (page - 1) * size;
// 获取查询参数
String username = exchange.getIn().getHeader("username", "", String.class);
String email = exchange.getIn().getHeader("email", "", String.class);
String phone = exchange.getIn().getHeader("phone", "", String.class);
String activeStr = exchange.getIn().getHeader("active", "", String.class);
String fromDate = exchange.getIn().getHeader("fromDate", "", String.class);
String toDate = exchange.getIn().getHeader("toDate", "", String.class);
// 获取排序参数
String sortField = exchange.getIn().getHeader("sortField", "", String.class);
String sortOrder = exchange.getIn().getHeader("sortOrder", "DESC", String.class);
// 验证排序字段
if (!sortField.isEmpty() && !isValidSortField(sortField)) {
sortField = ""; // 无效的排序字段,使用默认排序字段(id)
}
// 验证排序顺序
if (!sortOrder.equals("ASC") && !sortOrder.equals("DESC")) {
sortOrder = "DESC"; // 无效的排序顺序,使用默认排序顺序
}
// 解析布尔值
Boolean active = null;
if (!activeStr.isEmpty()) {
active = Boolean.parseBoolean(activeStr);
}
// 将参数设置到消息头中
exchange.getIn().setHeader("page", page);
exchange.getIn().setHeader("size", size);
exchange.getIn().setHeader("offset", offset);
exchange.getIn().setHeader("limit", size);
exchange.getIn().setHeader("active", active);
exchange.getIn().setHeader("sortField", sortField);
exchange.getIn().setHeader("sortOrder", sortOrder);
// 创建参数Map用于SQL查询
Map<String, Object> params = new HashMap<>();
params.put("username", username);
params.put("email", email);
params.put("phone", phone);
params.put("active", active);
params.put("fromDate", fromDate);
params.put("toDate", toDate);
params.put("sortField", sortField);
params.put("sortOrder", sortOrder);
params.put("limit", size);
params.put("offset", offset);
exchange.getIn().setBody(params);
LOG.info("高级查询参数: page={}, size={}, username={}, email={}, active={}, sortField={}, sortOrder={}",
page, size, username, email, active, sortField, sortOrder);
}
private boolean isValidSortField(String field) {
return field.equals("username") || field.equals("email") || field.equals("createTime");
}
private void prepareErrorResponse(Exchange exchange) {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
String errorMessage = exception != null ? exception.getMessage() : "未知错误";
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("error", true);
errorResponse.put("message", errorMessage);
exchange.getIn().setBody(errorResponse);
exchange.getIn().setHeader(Exchange.HTTP_RESPONSE_CODE, 500);
}
private void prepareCreateUserParams(Exchange exchange) {
User user = exchange.getIn().getBody(User.class);
Map<String, Object> params = new HashMap<>();
params.put("username", user.getUsername());
params.put("email", user.getEmail());
params.put("password", user.getPassword());
params.put("fullName", user.getFullName());
params.put("phone", user.getPhone());
params.put("active", user.isActive());
exchange.getIn().setHeader("username", user.getUsername());
exchange.getIn().setHeader("email", user.getEmail());
exchange.getIn().setHeader("password", user.getPassword());
exchange.getIn().setHeader("fullName", user.getFullName());
exchange.getIn().setHeader("phone", user.getPhone());
exchange.getIn().setHeader("active", user.isActive());
exchange.getIn().setBody(params);
}
private void prepareUpdateUserParams(Exchange exchange) {
User user = exchange.getIn().getBody(User.class);
String userId = exchange.getIn().getHeader("userId", String.class);
Map<String, Object> params = new HashMap<>();
params.put("id", Long.parseLong(userId));
params.put("username", user.getUsername());
params.put("email", user.getEmail());
params.put("password", user.getPassword());
params.put("fullName", user.getFullName());
params.put("phone", user.getPhone());
params.put("active", user.isActive());
exchange.getIn().setHeader("id", Long.parseLong(userId));
exchange.getIn().setHeader("username", user.getUsername());
exchange.getIn().setHeader("email", user.getEmail());
exchange.getIn().setHeader("password", user.getPassword());
exchange.getIn().setHeader("fullName", user.getFullName());
exchange.getIn().setHeader("phone", user.getPhone());
exchange.getIn().setHeader("active", user.isActive());
exchange.getIn().setBody(params);
}
private User mapToUser(Map<String, Object> result) {
User user = new User();
user.setId(((Number) result.get("id")).longValue());
user.setUsername((String) result.get("username"));
user.setEmail((String) result.get("email"));
user.setPassword(null); // 出于安全考虑,不返回密码
user.setFullName((String) result.get("full_name"));
user.setPhone((String) result.get("phone"));
user.setCreateTime((java.sql.Timestamp) result.get("create_time"));
user.setUpdateTime((java.sql.Timestamp) result.get("update_time"));
user.setActive((Boolean) result.get("active"));
return user;
}
}