数据同步服务-语音
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

548 lines
21 KiB

1 week ago
package com.threecloud.dataserviceyy.service;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.threecloud.dataserviceyy.entity.MidVoiceCallRecord;
import com.threecloud.dataserviceyy.entity.MidVoiceChannelConfig;
import com.threecloud.dataserviceyy.mapper.MidVoiceCallRecordMapper;
import com.threecloud.dataserviceyy.mapper.VoiceSyncMapper;
import com.threecloud.dataserviceyy.service.channel.ChannelConfigService;
import com.threecloud.dataserviceyy.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.File;
import java.net.URLEncoder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* VAA录音盒定时同步服务
*
* 功能说明
* 1. 定时从 EBOX 录音盒拉取录音记录
* 2. 下载录音文件到本地保留10天
* 3. 上传录音文件到 OSS
* 4. 保存通话记录到 mid_voice_call_record
*
* 数据来源
* - 设备列表mid_voice_device_config
* - 通道配置优先从 EBOX API 获取失败则从数据库读取
*
* 同步策略
* - 每2小时执行一次cron: 0 0 0/2 * * ?
* - 增量同步根据上次同步时间只拉取新录音
* - 防重复根据 device_no + record_id 判断
*
* 目录结构
* vaa-recordings/
* 20240101/ # 按日期分目录
* <device_uuid>/ # 按设备分目录
* xxx.wav
* .sync-marker/ # 同步时间标记
* <device_id>.time
*/
@Service
public class VaaSyncService {
private static final Logger logger = LoggerFactory.getLogger(VaaSyncService.class);
// ==================== 依赖注入 ====================
@Autowired
private VoiceSyncMapper voiceSyncMapper;
@Autowired
private MidVoiceCallRecordMapper callRecordMapper;
@Autowired
private ChannelConfigService channelConfigService;
@Autowired
private VaaHttpUtil vaaHttpUtil;
@Autowired
private FileUploadUtil fileUploadUtil;
// ==================== 配置参数 ====================
/** 本地录音文件存储路径 */
@Value("${vaa-sync.download-path:./vaa-recordings}")
private String downloadPath;
/** 录音盒登录用户名 */
@Value("${vaa-sync.device-username:admin}")
private String deviceUsername;
/** 录音盒登录密码 */
@Value("${vaa-sync.device-password:admin}")
private String devicePassword;
/** 本地文件保留天数 */
@Value("${vaa-sync.retain-days:10}")
private int retainDays;
// ==================== 定时任务 ====================
/**
* 定时同步任务 - 每2小时执行一次
*
* cron表达式说明0 0 0/2 * * ?
* - 0
* - 0
* - 0/2 表示从0点开始每2小时
* - * 每天
* - * 每月
* - ? 不指定
*/
@Scheduled(cron = "${vaa-sync.sync-interval-cron:0 0 0/2 * * ?}")
public void scheduledSync() {
logger.info("【定时任务】========== VAA录音盒同步开始 ==========");
long startTime = System.currentTimeMillis();
executeSync();
long costTime = System.currentTimeMillis() - startTime;
logger.info("【定时任务】========== VAA录音盒同步结束,耗时 {} 秒 ==========", costTime / 1000);
}
// ==================== 核心同步逻辑 ====================
/**
* 执行同步任务主入口
*
* 执行流程
* 1. 清理过期本地文件
* 2. 查询所有在线设备
* 3. 逐个设备同步
*/
public void executeSync() {
logger.info("【主流程】开始执行VAA录音盒同步任务");
try {
// 步骤1:清理过期本地文件
logger.debug("【步骤1】清理 {} 天前的本地录音文件", retainDays);
FileCleaner.cleanOldFiles(downloadPath, retainDays);
// 步骤2:查询设备列表
logger.debug("【步骤2】查询在线设备列表");
List<Map<String, Object>> deviceList = voiceSyncMapper.getAllYysb();
logger.info("【主流程】查询到 {} 个语音设备", deviceList.size());
// 步骤3:逐个设备同步
int successCount = 0;
int failCount = 0;
for (int i = 0; i < deviceList.size(); i++) {
Map<String, Object> device = deviceList.get(i);
String deviceId = getStringValue(device, "ID");
logger.debug("【步骤3】处理第 {}/{} 个设备: ID={}", i + 1, deviceList.size(), deviceId);
try {
syncSingleDevice(device);
successCount++;
} catch (Exception e) {
failCount++;
logger.error("【异常】设备同步失败: ID={}, 原因={}", deviceId, e.getMessage());
}
}
logger.info("【主流程】同步完成,成功 {} 个设备,失败 {} 个设备", successCount, failCount);
} catch (Exception e) {
logger.error("【异常】同步任务执行失败: {}", e.getMessage(), e);
}
}
/**
* 同步单个设备
*
* @param device 设备信息Map包含
* - ID: 设备ID
* - UUID: 设备编号device_no
* - ORGAN_NAME: 地市名称
* - ORGAN_ID: 地市编码
* - IP: IP地址
* - PORT: 端口号
* - ORG_CODE: 单位代码
*/
private void syncSingleDevice(Map<String, Object> device) throws Exception {
// 提取设备信息
String deviceId = getStringValue(device, "ID");
String deviceNo = getStringValue(device, "UUID");
String cityName = getStringValue(device, "ORGAN_NAME");
String cityCode = getStringValue(device, "ORGAN_ID");
String ip = getStringValue(device, "IP");
Integer port = getIntValue(device, "PORT", 80);
String orgCode = getStringValue(device, "ORG_CODE");
logger.info("【设备】────────────────────────────────────────");
logger.info("【设备】开始同步设备: ID={}, 编号={}, 机构={}, IP={}:{}",
deviceId, deviceNo, cityName, ip, port);
// 参数校验
if (!StringUtils.hasText(ip)) {
logger.warn("【设备】设备IP为空,跳过同步: ID={}", deviceId);
return;
}
// 机构名称兜底
if (!StringUtils.hasText(cityName)) {
cityName = "unknown_" + deviceId;
}
// 构造设备访问地址
String deviceHost = buildDeviceHost(ip, port);
// 步骤1:登录认证
logger.debug("【设备-步骤1】登录设备: {}", deviceHost);
String authToken = loginDevice(deviceHost);
if (authToken == null) {
logger.error("【设备-异常】设备登录失败,跳过同步: IP={}", ip);
return;
}
logger.debug("【设备-步骤1】登录成功,获取到认证令牌");
// 步骤2:获取时间范围
TimeRange timeRange = calculateSyncTimeRange(deviceId);
logger.debug("【设备-步骤2】同步时间范围: {} 至 {}", timeRange.startTime, timeRange.endTime);
// 步骤3:获取录音列表
logger.debug("【设备-步骤3】获取录音列表...");
JSONArray records = fetchRecordList(deviceHost, authToken, timeRange);
if (records == null || records.isEmpty()) {
logger.info("【设备】设备 {} 没有新的录音记录", deviceId);
return;
}
logger.info("【设备】获取到 {} 条录音记录", records.size());
// 步骤4:处理每条录音
SyncResult result = processRecords(records, deviceNo, cityName, cityCode, orgCode, deviceHost, authToken);
// 步骤5:保存同步时间
if (result.latestCallTime != null) {
SyncTimeUtil.writeLastSyncTime(downloadPath, deviceId, result.latestCallTime);
logger.debug("【设备-步骤5】保存同步时间: {}", result.latestCallTime);
}
logger.info("【设备】同步完成: ID={}, 成功{}条, 失败{}条", deviceId, result.successCount, result.failCount);
}
// ==================== 私有辅助方法 ====================
/**
* 登录设备获取认证令牌
*
* @param deviceHost 设备访问地址 192.168.1.100:80
* @return 认证令牌Cookie失败返回null
*/
private String loginDevice(String deviceHost) {
try {
String loginUrl = String.format("http://%s/authorize?username=%s&password=%s",
deviceHost, urlEncode(deviceUsername), urlEncode(devicePassword));
logger.info("正在登录设备: {}", deviceHost);
String authToken = vaaHttpUtil.httpLogin(loginUrl);
logger.info("登录成功");
return authToken;
} catch (Exception e) {
logger.error("设备登录失败: {}, 原因={}", deviceHost, e.getMessage());
return null;
}
}
/**
* 计算同步时间范围
*
* @param deviceId 设备ID
* @return 时间范围Unix时间戳
*/
private TimeRange calculateSyncTimeRange(String deviceId) {
Date lastSyncTime = SyncTimeUtil.readLastSyncTime(downloadPath, deviceId);
Date now = new Date();
// 如果没有上次同步时间,或超过1天,则从昨天开始
if (lastSyncTime == null || DateUtil.getDateDoubleDiff(now, lastSyncTime) > 1.0) {
lastSyncTime = DateUtil.addDayByDate(now, -1);
}
return new TimeRange(lastSyncTime.getTime() / 1000, now.getTime() / 1000);
}
/**
* 获取录音列表
*
* @param deviceHost 设备访问地址
* @param authToken 认证令牌
* @param timeRange 时间范围
* @return 录音记录JSON数组
*/
private JSONArray fetchRecordList(String deviceHost, String authToken, TimeRange timeRange) {
try {
String recordUrl = String.format("http://%s/service/record/~/time[%d,%d]",
deviceHost, timeRange.startTime, timeRange.endTime);
logger.debug("获取录音列表: {}", recordUrl);
String recordData = vaaHttpUtil.httpVisit(recordUrl, authToken);
return vaaHttpUtil.parseRecordData(recordData);
} catch (Exception e) {
logger.error("获取录音列表失败: {}", e.getMessage());
return null;
}
}
/**
* 批量处理录音记录
*/
private SyncResult processRecords(JSONArray records, String deviceNo, String cityName,
String cityCode, String orgCode, String deviceHost, String authToken) {
SyncResult result = new SyncResult();
for (int i = 0; i < records.size(); i++) {
JSONObject record = records.getJSONObject(i);
try {
boolean success = processSingleRecord(record, deviceNo, cityName, cityCode, orgCode, deviceHost, authToken);
if (success) {
result.successCount++;
// 跟踪最新的通话时间
Date callTime = parseCallTime(record);
if (callTime != null && (result.latestCallTime == null || callTime.after(result.latestCallTime))) {
result.latestCallTime = callTime;
}
} else {
result.failCount++;
}
} catch (Exception e) {
logger.error("处理录音记录失败[{}]: {}", i, e.getMessage());
result.failCount++;
}
}
return result;
}
/**
* 处理单条录音记录
*
* 处理流程
* 1. 解析录音信息
* 2. 检查是否已存在防重复
* 3. 查询通道配置获取本机号码
* 4. 下载录音文件如不存在
* 5. 上传到OSS
* 6. 保存到数据库
*/
private boolean processSingleRecord(JSONObject record, String deviceNo, String cityName,
String cityCode, String orgCode, String deviceHost, String authToken) throws Exception {
// ========== 步骤1:解析录音信息 ==========
String recordId = RecordParser.parseRecordId(record);
String filePath = RecordParser.parseFilePath(record);
Integer channel = RecordParser.parseChannel(record);
String phone = RecordParser.parsePhone(record);
boolean isOutgoing = RecordParser.isOutgoing(record);
boolean isAnswered = RecordParser.isAnswered(record);
Long begTime = RecordParser.parseBegTime(record);
Long endTime = RecordParser.parseEndTime(record);
logger.debug("【录音】处理记录: recordId={}, channel={}, phone={}, direction={}",
recordId, channel, phone, isOutgoing ? "呼出" : "呼入");
// 参数校验
if (filePath == null || filePath.isEmpty()) {
logger.debug("【录音】录音文件路径为空,跳过: recordId={}", recordId);
return false;
}
if (begTime == null || endTime == null) {
logger.warn("【录音-异常】录音时间信息缺失,跳过: recordId={}", recordId);
return false;
}
// ========== 步骤2:防重复检查 ==========
String callRecordId = deviceNo + "_" + recordId;
if (callRecordMapper.selectByCallRecordId(callRecordId) != null) {
logger.debug("【录音】通话记录已存在,跳过: {}", callRecordId);
return true;
}
// ========== 步骤3:准备文件路径(按地市分文件夹) ==========
Date callStartTime = new Date(begTime * 1000);
String fileName = FilePathUtil.extractFileName(filePath);
// 本地路径: {basePath}/{cityCode}/{date}/{uuid}/{fileName}
String localPath = FilePathUtil.buildLocalPath(downloadPath, cityCode, callStartTime, deviceNo, fileName);
Path localFile = Paths.get(localPath);
// OSS路径: {cityCode}/{date}/{fileName}
String ossPath = FilePathUtil.buildOssPath(cityCode, callStartTime, fileName);
logger.debug("【录音-步骤3】路径信息: localPath={}, ossPath={}", localPath, ossPath);
logger.debug("【录音】文件信息: fileName={}, localPath={}", fileName, localPath);
// ========== 步骤4:查询通道配置 ==========
logger.debug("【录音-步骤4】查询通道配置: deviceNo={}, channel={}", deviceNo, channel);
MidVoiceChannelConfig channelConfig = channelConfigService.getChannelConfig(
deviceNo, channel, deviceHost, "EBOX-8108", authToken);
String channelPhone = channelConfig != null ? channelConfig.getPhoneNumber() : null;
logger.debug("【录音-步骤4】通道配置: channelPhone={}", channelPhone);
// ========== 步骤5:构建通话记录实体 ==========
MidVoiceCallRecord callRecord = buildCallRecord(
callRecordId, deviceNo, cityCode, cityName, orgCode,
fileName, localPath, callStartTime, new Date(endTime * 1000),
(int) (endTime - begTime), channelPhone, phone, isOutgoing, isAnswered);
// ========== 步骤6:下载录音文件 ==========
if (!Files.exists(localFile) || Files.size(localFile) == 0) {
logger.info("【录音-步骤6】开始下载录音: {}", fileName);
String fileUrl = "http://" + deviceHost + filePath;
vaaHttpUtil.httpDown(fileUrl, localPath, authToken);
logger.info("【录音-步骤6】录音下载完成: {} ({} 字节)", localPath, Files.size(localFile));
} else {
logger.debug("【录音-步骤6】录音文件已存在本地,跳过下载: {} ({} 字节)",
fileName, Files.size(localFile));
}
callRecord.setRecordingFileSize((int) Files.size(localFile));
// ========== 步骤7:上传到OSS ==========
logger.debug("【录音-步骤7】开始上传OSS: cityName={}, ossPath={}", cityName, ossPath);
byte[] wavData = Files.readAllBytes(localFile);
String ossUrl = fileUploadUtil.uploadWav(cityName, ossPath, fileName, wavData);
logger.info("【录音-步骤7】录音上传OSS成功: {}", ossUrl);
callRecord.setRecordingFilePath(ossUrl);
// ========== 步骤8:保存到数据库 ==========
logger.debug("【录音-步骤8】保存通话记录到数据库...");
callRecordMapper.insert(callRecord);
logger.info("【录音-步骤8】通话记录保存成功: id={}, callRecordId={}", callRecord.getId(), callRecordId);
return true;
}
/**
* 构建通话记录实体
*/
private MidVoiceCallRecord buildCallRecord(String callRecordId, String deviceNo, String cityCode,
String cityName, String orgCode, String fileName,
String localPath, Date callStartTime, Date callEndTime,
int duration, String channelPhone, String remotePhone,
boolean isOutgoing, boolean isAnswered) {
MidVoiceCallRecord record = new MidVoiceCallRecord();
// 基础信息
record.setCallRecordId(callRecordId);
record.setDeviceNo(deviceNo);
record.setCityCode(cityCode);
record.setCityName(cityName);
record.setOrgCode(orgCode);
// 文件信息
record.setRecordingFileName(fileName);
record.setLocalPath(localPath);
// 时间信息
record.setCallStartTime(callStartTime);
record.setCallEndTime(callEndTime);
record.setCallDuration(duration);
// 通话方向:1呼入,2呼出
record.setCallDirection(isOutgoing ? "2" : "1");
// 主叫/被叫号码
String localNumber = channelPhone != null && !channelPhone.isEmpty() ? channelPhone : "";
String remoteNumber = remotePhone != null ? remotePhone : "";
if (isOutgoing) {
// 呼出:本机打给对方
record.setCallTel(localNumber); // 主叫:本机号码
record.setCalledTel(remoteNumber); // 被叫:对方号码
} else {
// 呼入:对方打给本机
record.setCallTel(remoteNumber); // 主叫:对方号码
record.setCalledTel(localNumber); // 被叫:本机号码
}
// 通话状态
record.setCallStatus(isAnswered ? "1" : "2"); // 1正常接通,2未接通
return record;
}
// ==================== 工具方法 ====================
/**
* 构建设备访问地址
*/
private String buildDeviceHost(String ip, Integer port) {
return ip + (port != null && port != 80 ? ":" + port : "");
}
/**
* 解析通话时间
*/
private Date parseCallTime(JSONObject record) {
Long begTime = RecordParser.parseBegTime(record);
return begTime != null ? new Date(begTime * 1000) : null;
}
/**
* 从Map中获取字符串值
*/
private String getStringValue(Map<String, Object> map, String key) {
Object value = map.get(key);
return value != null ? value.toString() : null;
}
/**
* 从Map中获取整数值
*/
private Integer getIntValue(Map<String, Object> map, String key, Integer defaultValue) {
Object value = map.get(key);
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
return defaultValue;
}
}
/**
* URL编码
*/
private String urlEncode(String value) throws Exception {
return URLEncoder.encode(value, "UTF-8");
}
// ==================== 内部类 ====================
/**
* 时间范围
*/
private static class TimeRange {
final long startTime;
final long endTime;
TimeRange(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
}
@Override
public String toString() {
return String.format("[%d, %d]", startTime, endTime);
}
}
/**
* 同步结果
*/
private static class SyncResult {
int successCount = 0;
int failCount = 0;
Date latestCallTime = null;
}
}