-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain_with_sync.py
More file actions
312 lines (265 loc) · 14.4 KB
/
main_with_sync.py
File metadata and controls
312 lines (265 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import gradio as gr
import numpy as np
from PIL import Image
import requests
import os
import tempfile
import subprocess
import librosa # 用于音频处理
import soundfile as sf
import matplotlib.pyplot as plt
# 定义同步头参数
sync_freq=1000
sync_duration=0.1
def add_sync_header_to_audio(audio_path, sync_freq, sync_duration):
"""
在音频前添加同步头
Args:
audio_filepath: 输入音频文件路径
sync_freq: 同步头频率(Hz)
sync_duration: 同步头持续时间(秒)
Returns:
temp_audio_with_sync_path: 带同步头的临时音频文件路径
"""
try:
# 读取原始音频文件
data, sample_rate = librosa.load(audio_path, sr=None)
print(f"[调试] 原始音频文件采样率: {sample_rate}, 长度: {len(data)} 采样点")
# 生成同步头(使用与原始音频相同的采样率)
t = np.arange(int(sync_duration * sample_rate)) / sample_rate
sync_wave = 0.8 * np.sin(2 * np.pi * sync_freq * t) # 振幅0.8,防止削波
print(f"[调试] 同步头长度: {len(sync_wave)} 采样点")
# 保持通道数一致
if data.ndim > 1:
sync_wave = np.tile(sync_wave[:, np.newaxis], (1, data.shape[1]))
# 将同步头与原始音频拼接
new_data = np.concatenate([sync_wave, data], axis=0)
print(f"[调试] 添加同步头后的音频长度: {len(new_data)} 采样点")
# # 保存为临时文件
# with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
# temp_audio_with_sync_path = tmp_file.name
# # 写入音频文件
# sf.write(temp_audio_with_sync_path, new_data, sample_rate)
# print(f"[+] 已添加同步头,临时文件保存为: {temp_audio_with_sync_path}")
# return temp_audio_with_sync_path
return new_data, sample_rate
except Exception as e:
print(f"[错误] 添加同步头时发生错误: {str(e)}")
# 如果添加同步头失败,返回原始文件路径
return audio_path
def plot_waveform_and_spectrum(audio_path):
data, samplerate = sf.read(audio_path)
if data.ndim > 1:
data = data[:, 0]
t = np.arange(len(data)) / samplerate
# 时域波形
fig1, ax1 = plt.subplots(figsize=(8, 2))
ax1.plot(t, data)
ax1.set_title("The time-domain waveform of the received signal")
ax1.set_xlabel("Time (s)")
ax1.set_ylabel("Amplitude")
plt.tight_layout()
tmp_waveform = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
plt.savefig(tmp_waveform.name, format='png')
plt.close(fig1)
# 频谱
fft_data = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1/samplerate)
fig2, ax2 = plt.subplots(figsize=(8, 2))
ax2.plot(freqs[:len(freqs)//2], np.abs(fft_data)[:len(freqs)//2])
ax2.set_title("Received signal spectrum")
ax2.set_xlabel("Frequency (Hz)")
ax2.set_ylabel("Amplitude")
plt.tight_layout()
tmp_spectrum = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
plt.savefig(tmp_spectrum.name, format='png')
plt.close(fig2)
return tmp_waveform.name, tmp_spectrum.name
# New function to send audio to server and get a reconstructed image
def reconstruct_image_from_audio(audio_filepath):
if audio_filepath is None:
return "请先上传一个WAV音频文件。", None
temp_reconstructed_image_path = None
# Server URL for audio decoding to image
server_url = "http://jscc.wille.homjay.com:55000/decode_audio"
# Placeholder for the path where the decoded image will be saved (locally, before Gradio handles it)
# decoded_image_path = "temp_decoded_image.png" # Using NamedTemporaryFile is better
try:
print(f"[*] 正在将音频解码为图像:{audio_filepath} → Server")
with open(audio_filepath, 'rb') as audio_file_obj:
files = {'audio': (os.path.basename(audio_filepath), audio_file_obj, 'audio/wav')}
response = requests.post(server_url, files=files, timeout=60) # Increased timeout for potentially larger data
response.raise_for_status()
# Save the received image content to a temporary file
# Assuming the server returns a common image format like PNG or JPEG
# Forcing .png as a common, safe default. Server should ideally send Content-Type.
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_image_file_obj:
tmp_image_file_obj.write(response.content)
temp_reconstructed_image_path = tmp_image_file_obj.name
print(f"[+] 解码完成,重建图像文件已临时保存为:{temp_reconstructed_image_path}")
return f"音频解码成功!图像已重建。", temp_reconstructed_image_path
except requests.exceptions.Timeout:
return f"调用解码服务器超时 ({server_url}). 请检查服务器状态或网络连接。", None
except requests.exceptions.HTTPError as e:
error_detail = f"服务器响应内容: {e.response.text[:200]}" if e.response else "无详细响应内容"
return f"解码服务器返回错误: {e.response.status_code} - {e.response.reason}. {error_detail}", None
except requests.exceptions.RequestException as e:
return f"调用解码服务器时发生网络错误: {str(e)}", None
except Exception as e:
import traceback
print(f"处理音频解码时发生未知错误: {traceback.format_exc()}")
return f"处理音频解码时发生未知错误: {str(e)}", None
finally:
# Gradio will make a copy of temp_reconstructed_image_path if it's returned.
# So, we can attempt to clean it up here.
if temp_reconstructed_image_path and os.path.exists(temp_reconstructed_image_path):
try:
os.remove(temp_reconstructed_image_path)
print(f"[+] 已清理临时重建图像: {temp_reconstructed_image_path}")
except Exception as e_clean:
print(f"[!] 清理临时重建图像失败 {temp_reconstructed_image_path}: {e_clean}")
# encode_image_with_server 函数 - 输入UI将允许摄像头
def encode_image_with_server(image_pil_for_encode):
if image_pil_for_encode is None:
# 更新了提示信息以反映摄像头选项
return "请先上传图片或使用摄像头拍照。", None, None, None
temp_image_path = None
temp_audio_path = None
temp_audio_with_sync_path = None
server_url_encode = "http://jscc.wille.homjay.com:55000/encode_image"
try:
# 保存图片到临时文件
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_image_file_obj:
image_pil_for_encode.save(tmp_image_file_obj.name, "PNG")
temp_image_path = tmp_image_file_obj.name
print(f"[*] 正在将图像编码为音频:{temp_image_path} → {server_url_encode}")
# 发送图片到服务器,获取音频内容
with open(temp_image_path, 'rb') as img_file:
files = {'image': (os.path.basename(temp_image_path), img_file, 'image/png')}
response = requests.post(server_url_encode, files=files, timeout=30)
response.raise_for_status()
# 1. 保存服务器返回的原始编码音频到指定路径
encoded_signal_path = "/home/dlh/UI/encoded_signal.wav"
with open(encoded_signal_path, "wb") as f:
f.write(response.content)
print(f"[+] 服务器编码完成,音频文件已保存为:{encoded_signal_path}")
# 2. 为编码后的音频添加同步头,并保存到指定路径
print("[*] 正在为编码后的音频添加同步头...")
sync_output_path = "/home/dlh/UI/encoded_signal_sync.wav"
# 读取原始音频
# audio_data, original_sample_rate = librosa.load(encoded_signal_path, sr=None)
# sync_header = generate_sync_header(sample_rate=original_sample_rate)
# audio_with_sync = np.concatenate([sync_header, audio_data])
# sf.write(sync_output_path, audio_with_sync, original_sample_rate)
# 使用新的 add_sync_header_to_audio 函数
audio_with_sync, sample_rate = add_sync_header_to_audio(encoded_signal_path, sync_freq, sync_duration)
# 保存带同步头的音频
sf.write(sync_output_path, audio_with_sync, sample_rate)
print(f"[+] 同步头添加完成,带同步头音频文件:{sync_output_path}")
# 生成波形图和频谱图
print("[*] 正在生成波形图和频谱图...")
waveform_path, spectrum_path = plot_waveform_and_spectrum(sync_output_path)
print(f"[+] 波形图和频谱图已生成: {waveform_path}, {spectrum_path}")
status_msg = f"图像编码成功!\n- 原始编码音频: {encoded_signal_path}\n- 带同步头音频: {sync_output_path}"
return status_msg, sync_output_path, waveform_path, spectrum_path
except requests.exceptions.Timeout:
return f"调用编码服务器超时 ({server_url_encode}). 请检查服务器状态或网络连接。", None, None, None
except requests.exceptions.HTTPError as e:
error_detail = f"服务器响应内容: {e.response.text[:200]}" if e.response else "无详细响应内容"
return f"编码服务器返回错误: {e.response.status_code} - {e.response.reason}. {error_detail}", None, None, None
except requests.exceptions.RequestException as e:
return f"调用编码服务器时发生网络错误: {str(e)}", None, None, None
except Exception as e:
import traceback
print(f"处理图片编码时发生未知错误: {traceback.format_exc()}")
return f"处理图片编码时发生未知错误: {str(e)}", None, None, None
finally:
# 清理临时图片文件
if temp_image_path and os.path.exists(temp_image_path):
try:
os.remove(temp_image_path)
print(f"[+] 已清理临时图片: {temp_image_path}")
except Exception as e_clean:
print(f"[!] 清理临时图片失败 {temp_image_path}: {e_clean}")
# 在后台运行指定的 Python 脚本
def run_audio_script_background():
script_to_run = "wav_to_wav_transmit_new.py"
python_executable = "python"
command = [python_executable, script_to_run]
try:
# 使用 Popen 执行命令并捕获输出
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 将stderr重定向到stdout
text=True, # 使用文本模式,自动解码输出
bufsize=1, # 行缓冲
universal_newlines=True
)
# 读取输出直到进程结束
output_lines = []
while True:
line = process.stdout.readline()
if not line and process.poll() is not None:
break
if line:
output_lines.append(line.strip())
print(f"[脚本输出] {line.strip()}") # 在控制台也打印输出
# 等待进程结束并获取返回码
return_code = process.wait()
# 整理输出信息
output_text = "\n".join(output_lines)
if return_code == 0:
return f"信号文件发送成功!\n\n脚本输出日志:\n{output_text}"
else:
return f"信号文件发送失败(返回码:{return_code})\n\n脚本输出日志:\n{output_text}"
except FileNotFoundError:
return f"错误: 无法找到 '{python_executable}' 命令或脚本 '{script_to_run}'。\n请确保 Python 已安装并在 PATH 中,且脚本存在。"
except Exception as e:
return f"启动脚本时发生错误: {str(e)}"
# 创建 Gradio Blocks 界面
with gr.Blocks() as demo:
gr.Markdown("# 基于语义通信的宽带短波图传系统") # 用户更新的应用标题
with gr.Tab("发送端"): # 用户更新的选项卡名称
gr.Markdown("上传一张图片或使用摄像头拍摄一张照片,然后点击\"图像编码\"按钮,将得到对图像进行编码后的信号文件。") # 用户更新的描述
with gr.Row():
with gr.Column(scale=1):
encode_image_input = gr.Image(type="pil", label="上传图片或拍照", sources=["upload", "webcam"], height=392)
encode_button = gr.Button("图像编码")
encode_output_text = gr.Textbox(label="编码状态", lines=3, interactive=False)
encode_audio_output = gr.Audio(label="编码后的信号文件", type="filepath")
with gr.Column(scale=1):
waveform_output = gr.Image(label="时域波形图", type="filepath", interactive=False, height=188)
spectrum_output = gr.Image(label="信号频谱图", type="filepath", interactive=False, height=188)
run_script_button = gr.Button("发送信号文件")
script_status_output = gr.Textbox(label="发送日志(log)", lines=3, interactive=False)
# 事件处理器
encode_button.click(
fn=encode_image_with_server,
inputs=encode_image_input,
outputs=[encode_output_text, encode_audio_output, waveform_output, spectrum_output]
)
run_script_button.click(
fn=run_audio_script_background,
inputs=None,
outputs=script_status_output
)
with gr.Tab("接收端"): # 用户更新的选项卡名称
gr.Markdown("上传接收到的WAV音频文件,然后点击\"图像重建\"按钮查看效果。")
with gr.Row():
with gr.Column(scale=1):
# Audio 输入, 仅允许上传
decode_audio_input = gr.Audio(type="filepath", label="上传WAV音频文件 (解码)", sources=["upload"])
decode_button = gr.Button("图像重建")
with gr.Column(scale=1):
# 更新了标签以更准确地反映其内容
decode_output_text = gr.Textbox(label="解码状态", lines=5, interactive=False)
# 将原来的 Audio 输出更改为 Image 输出以显示重建的图像
reconstructed_image_output = gr.Image(label="重建后的图像", type="filepath")
decode_button.click(
fn=reconstruct_image_from_audio, # 使用新的后端函数
inputs=decode_audio_input,
outputs=[decode_output_text, reconstructed_image_output] # 更新输出组件
)
if __name__ == "__main__":
demo.launch()