viewtopic.php?f=74&t=169183&start=0
正好这两天学习python,就给这个收音机程序加上了GUI,因为是用python写的第一个程序,写得不好,请见谅。GPLv3协议,欢迎修改。
因为程序很简单,就不做deb了,使用方法如下
1)gedit spradio.py
2)拷贝最下面的代码到spradio.py文件中,并保存
3)chmod 700 spradio.py
4)sudo spradio.py (第一次运行要用root权限运行,因为要安装依赖的包,原因见原帖)
或者也可以运行这条命令,手工把需要的包装上
代码: 全选
sudo apt-get install gstreamer0.10-plugins-bad gstreamer0.10-plugins-ugly gstreamer0.10-plugins-good python-gst python-lxml python-wxgtk2.6
代码: 全选
#!/usr/bin/env python
# --*-- encoding:utf-8 --*--
# Spradio, simple network radio player
# Copyright (C) <2009> Devin & Shrek Zhou
# License GPLv3
from urllib2 import urlopen
from urllib import quote, unquote
from lxml import etree
import pygst
pygst.require("0.10")
import gst
import gobject
import sys
import os
import wx
theApp = 0
def GuiInit():
theApp = App()
theApp.MainLoop()
class App(wx.App):
def OnInit(self):
self.SetAppName(u'Spradio')
self.frame = Frame()
self.frame.SetSize((430, 300))
self.frame.CenterOnScreen()
self.frame.Show()
self.SetTopWindow(self.frame)
wx.MessageBox(u'请点击"刷新"键获得广播频道列表', u'提示', wx.ICON_INFORMATION)
return True
class Frame(wx.Frame):
radio = None
def __init__(self, parent=None):
wx.Frame.__init__(self, parent, title=u'网络广播播放器')
self.panel = wx.Panel(self)
self.buttonStop = wx.Button(self.panel, wx.ID_ANY, u'停止(&T)')
self.buttonStop.Bind(wx.EVT_BUTTON, self.OnStop)
self.buttonRefresh = wx.Button(self.panel, wx.ID_ANY, u'刷新(&R)')
self.buttonRefresh.Bind(wx.EVT_BUTTON, self.OnRefreshList)
self.buttonQuit = wx.Button(self.panel, wx.ID_ANY, u'退出(&Q)')
self.buttonQuit.Bind(wx.EVT_BUTTON, self.OnCloseWindow)
self.statusBar = self.CreateStatusBar()
self.createMenuBar()
self.radioList = wx.ListCtrl(self.panel, style=wx.LC_REPORT | wx.SUNKEN_BORDER)
self.radioList.InsertColumn(0, u'频道号', wx.LIST_FORMAT_CENTER)
self.radioList.InsertColumn(1, u'频道名称', wx.LIST_FORMAT_CENTER, 330)
self.radioList.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.OnListItemActivated)
self._do_layout()
def _do_layout(self):
sizer_1 = wx.BoxSizer(wx.VERTICAL)
sizer_2 = wx.BoxSizer(wx.VERTICAL)
sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
sizer_2.Add(self.radioList, 1, wx.EXPAND, 0)
sizer_3.Add(self.buttonStop, 0, 0, 0)
sizer_3.Add(self.buttonRefresh, 0, 0, 0)
sizer_3.Add(self.buttonQuit, 0, 0, 0)
sizer_2.Add(sizer_3, 0, wx.ALIGN_CENTER_HORIZONTAL, 0)
self.panel.SetSizer(sizer_2)
sizer_1.Add(self.panel, 1, wx.EXPAND, 0)
self.SetSizer(sizer_1)
sizer_1.Fit(self)
self.Layout()
def menuData(self):
return [(u"系统(&S)", (
(u"刷新(&R)", u"获得广播频道列表", self.OnRefreshList),
(u"停止(&T)", u"停止播放器", self.OnStop),
(u"退出(&Q)", u"退出播放器", self.OnCloseWindow))),
(u"帮助(&H)", (
(u"关于(&A)", u"显示关于对话框", self.OnAbout),
))]
def createMenuBar(self):
menuBar = wx.MenuBar()
for eachMenuData in self.menuData():
menuLabel = eachMenuData[0]
menuItems = eachMenuData[1]
menuBar.Append(self.createMenu(menuItems), menuLabel)
self.SetMenuBar(menuBar)
def createMenu(self, menuData):
menu = wx.Menu()
for eachItem in menuData:
if len(eachItem) == 2:
label = eachItem[0]
subMenu = self.createMenu(eachItem[1])
menu.AppendMenu(wx.NewId(), label, subMenu)
else:
self.createMenuItem(menu, *eachItem)
return menu
def createMenuItem(self, menu, label, status, handler,
kind=wx.ITEM_NORMAL):
if not label:
menu.AppendSeparator()
return
menuItem = menu.Append(- 1, label, status, kind)
self.Bind(wx.EVT_MENU, handler, menuItem)
def OnAbout(self, event):
info = wx.AboutDialogInfo()
info.SetCopyright(u'Copyright (C) 2009 Devin & Shrek Zhou')
info.SetDescription(u'Spradio是非常简单的网络广播播放器')
info.SetVersion(u'0.0.1')
info.SetLicense(u'GPLv3')
info.SetWebSite(u"http://www.ubuntu.org.cn")
wx.AboutBox(info)
def OnRefreshList(self, event):
self.OnStop(None)
self.SetStatusText(u'正在接收广播频道列表,请稍候...')
self.radio = BaiduRadio()
self.radios = self.radio.getradiolist()
for idx, name in enumerate(sorted(self.radios)):
self.radioList.Append([idx + 1, name])
if self.radioList.GetItemCount() > 0:
self.SetStatusText(u'接收广播频道列表成功,双击列表中的频道开始播放')
else:
self.SetStatusText(u'接收广播频道列表失败,请检查网络是否通畅')
def OnListItemActivated(self, event):
item = event.GetItem()
sortedkey = sorted(self.radios.keys())[int(item.GetText()) - 1]
self.OnStop(None)
self.SetStatusText(u'开始尝试播放频道:' + sortedkey)
try:
self.radio.playradio(quote(self.radios[sortedkey].encode('utf-8'), safe=':/'))
except Exception, e:
self.SetStatusText(u'播放频道:' + sortedkey + u'失败')
self.OnStop(None)
self.SetStatusText(u'正在播放频道:' + sortedkey)
def OnStop(self, event):
if self.radio != None:
self.radio.stop()
self.SetStatusText(u'播放已经停止,双击列表中的频道重新开始播放')
def OnCloseWindow(self, event):
self.Destroy()
'''
Parse and get radio list from baidu. play radio with gstreamer.
author:Shrek Zhou(zgwmike@hotmail.com)
'''
BBC_LIVE = u'http://www.囗囗囗囗囗囗囗囗囗/worldservice/meta/tx/nb/live_infent_au_nb.asx'
class AsxParser(object):
def parse(self):
root = urlopen(self.asxsrc).read()
root = etree.HTML(root)
refs = root.xpath(u".//ref")
for ref in refs:
href = ref.get(u'href', u'')
if href.endswith(u'.asx'):
self.asxsrc = href
self.parse()
else:
self.playlist.append(href)
return self.playlist
def __init__(self, asxfile):
if os.path.isfile(asxfile):
self.asxsrc = open(asxfile).read().strip()
else:
self.asxsrc = asxfile
self.playlist = []
class BaiduRadio(object):
rooturl = u'http://list.mp3.baidu.com/radio/iframe.html'
M_PLAY = u'Play'
M_RECORD = u'Record'
def __init__(self):
self.radios = {}
self.parseBBC()
self.player = gst.element_factory_make(u'playbin', u'player')
bus = self.player.get_bus()
bus.add_signal_watch()
bus.connect(u'message', self.on_message)
def parseBBC(self):
try:
asxparser = AsxParser(BBC_LIVE)
asxparser.parse()
if len(asxparser.playlist) > 0:
self.radios[u'BBC World Service'] = asxparser.playlist[0]
except Exception, e:
print e
def on_message(self, bus, message):
t = message.type
print message
def getradiolist(self):
root = readurl(self.rooturl, u'gb2312')
for a in root.xpath(u".//a"):
href = a.get(u'href')
if not href:continue
if href.startswith(u'mms') or href.startswith(u'rtsp'):
self.radios[a.text] = a.get(u'href').strip()
return self.radios
def playradio(self, radiourl, mode=M_PLAY, savefile=u"/home/shrek/radio.ogg"):
self.player.set_property(u'uri', radiourl)
if mode == self.M_RECORD:
self.player.set_property(u'audio-sink', RecorderBin(savefile))
self.player.set_state(gst.STATE_PLAYING)
def stop(self):
self.player.set_state(gst.STATE_NULL)
def readurl(url, encoding=None):
try:
con = urlopen(url)
resp = con.read()
con.close()
if encoding is not None:
return etree.HTML(resp.decode(u'gb18030'))
else:
return etree.HTML(resp)
except UnicodeDecodeError, e:
logger.exception(u'error decode html page %s', url)
return etree.HTML(resp)
except UnicodeEncodeError, e:
logger.exception(u'error encode html page %s', url)
return etree.HTML(resp)
except:
logger.exception(u'error while reading url %s', url)
return etree.HTML(u'<html></html>')
class RecorderBin(gst.Bin):
def __init__(self, output):
self.__gobject_init__()
conv = gst.element_factory_make(u"audioconvert", u"converter")
mymux = gst.element_factory_make(u"tee")
self.add(conv, mymux)
conv.link(mymux)
enc = gst.element_factory_make(u"vorbisenc", u"vorbis-encoder")
create = gst.element_factory_make(u"oggmux", u"ogg-create")
fileout = gst.element_factory_make(u"filesink", u"sink")
fileout.set_property(u"location", output)
self.add(enc, create, fileout)
gst.element_link_many(enc, create, fileout)
alsaconv = gst.element_factory_make(u'audioconvert')
alsaqueue = gst.element_factory_make(u'queue')
alsasink = gst.element_factory_make(u'alsasink')
self.add(alsaconv, alsaqueue, alsasink)
mymux.link(enc)
gst.element_link_many(mymux, alsaqueue, alsaconv, alsasink)
self.add_pad(gst.GhostPad(u'sink', conv.get_pad(u'sink')))
def verifyPkgs():
pkgs = u'gstreamer0.10-plugins-bad gstreamer0.10-plugins-ugly gstreamer0.10-plugins-good python-gst python-lxml python-wxgtk2.6'
import apt
c = apt.cache.Cache(apt.progress.OpTextProgress())
pkgs = pkgs.split(' ')
for pkg in pkgs:
if c.has_key(pkg):
p = c[pkg]
print u'======================='
print u'installing %s' % c[pkg].name
if not p.isInstalled:
p.markInstall()
else:
print u'%s is already installed!' % pkg
else:
print u'%s does not exists!' % pkg
c.commit(apt.progress.TextFetchProgress(), apt.progress.InstallProgress())
if __name__ == u'__main__':
import pwd
if pwd.getpwuid(os.geteuid())[0] == u'root':
verifyPkgs()
gobject.threads_init()
GuiInit()