自动切换音频输出设备python脚本

Totem,mplayer,sopcast,realplayer,bmp
回复
soiamso
帖子: 418
注册时间: 2008-09-06 2:00
送出感谢: 0
接收感谢: 0

自动切换音频输出设备python脚本

#1

帖子 soiamso » 2013-02-03 14:45

通过监听PulseAudio, DBus上的事件,再通过pacmd进行切换

代码: 全选

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
   需要修改 /etc/pulse/default.pa以下部分
   加载PulseAudio 对DBus的支持模块
   
    ### Load DBus protocol
    .ifexists module-dbus-protocol.so
    load-module module-dbus-protocol
    .endif
    
   sink与port的名字可以用 pacmd list-sinks 找到
   根据名字修改 honey_sink_pattern 以及 honey_portpattern
   
   default_signal_receiver
   默认情况是有线耳机(前置接口)与USB声卡/耳机的切换
   1.调节音量就会切换到调节的设备,
   2.插入有线耳机就会切换到有线耳机
   3.拔出其中一个设备切换到另外一个设备

   通过 AudioLogic/add_signal_receiver
   添加额外功能
   
   前置接口检测:
   https://wiki.ubuntu.com/Audio/PreciseJackDetectionTesting
   
   PulseAudio 前置接口接口路径适配
   /usr/share/pulseaudio/alsa-mixer/paths/analog-output-headphones.conf
   [Jack Front Headphone]
   required-any = any
   
"""

import sys
import dbus
import os
import subprocess
import re
from functools import partial

if sys.version_info >= (3,):
    from gi.repository import GObject
if sys.version_info < (3,):
    import gobject

from dbus.mainloop.glib import DBusGMainLoop


class AudioLogic(object):
    _conn = None
    
    def __new__(cls):
        assert AudioLogic._conn == None, "too many AudioLogic instance"
        return super(AudioLogic, cls).__new__(cls)
        
    def __init__(self,honey_port_pattern=r"headphones",honey_sink_pattern=r"Usb|usb|USB"):
        """
            honey_port_pattern 喜欢使用的port的名字含有的字符
            honey_sink_pattern 喜欢使用的的sink的名字含有的字符
            
        """
        DBusGMainLoop(set_as_default=True)
        
        self.pacore = None
        self.nulldevice = None
        self.honey_port_pattern = honey_port_pattern
        self.honey_sink_pattern = honey_sink_pattern
        self.fallback_sink = None
        self.nulldevice = open(os.devnull, 'w')        
        
        AudioLogic._conn = AudioLogic.connect()
        self.conn = AudioLogic._conn
        
        self.pacore = self.conn.get_object(object_path = "/org/pulseaudio/core1", introspect=False)
        self.pacore.ListenForSignal("", dbus.Array(signature='o'))
        
        
        if self.fallback_sink == None:
            self.fallback_sink = self.pacore.Get("org.PulseAudio.Core1","FallbackSink")

        assert self.fallback_sink != None, "There must have a fallback_sink"
    
    def default_signal_receiver(self):
        self.conn.add_signal_receiver(self.on_active_port_updated, signal_name="ActivePortUpdated")
        self.conn.add_signal_receiver(self.on_volume_updated, signal_name="VolumeUpdated", path_keyword="path")
        
    def add_signal_receiver(self,*args,**kargs):
        self.conn.add_signal_receiver(*args,**kargs)
    
    @staticmethod    
    def connect():
        if 'PULSE_DBUS_SERVER' in os.environ:
            address = os.environ['PULSE_DBUS_SERVER']
        else:
            bus = dbus.SessionBus()
            server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
            address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address")
    
        return dbus.connection.Connection(address)
    
    
    def get_playback_streams_index(self):
        """
            get all curent playback_streams
        """
        streams = self.pacore.Get("org.PulseAudio.Core1","PlaybackStreams")
        index = []
        for i in streams:
            tmp = self.conn.get_object("org.PulseAudio.Core1.Stream", i, introspect=False)            
            index.append(tmp.Get("org.PulseAudio.Core1.Stream", "Index"))
        return index
    
    
    @property    
    def honey_sink_names(self):
        """
            return a honey_sink list if no honey_sink retuen fallback_sink list
        """
        sinks = self.pacore.Get("org.PulseAudio.Core1","Sinks")
        names = []
        for i in sinks:
            tmp = self.conn.get_object("org.PulseAudio.Core1.Device", i, introspect=False)
            sinkname = tmp.Get("org.PulseAudio.Core1.Device", "Name")
            if re.search(self.honey_sink_pattern, sinkname):           
                names.append(sinkname)
        if len(names)==0:
            return self.fallback_sink
        return names
        
        
    def on_volume_updated(self, _, path=None):
        """
            when VolumnUpdated, move all playbackstream to
            the singnal emmiter (the sink)
        """
        sourceindex = self.get_playback_streams_index()      
        tmp = self.conn.get_object("org.PulseAudio.Core1.Device", path)
        sinkname = tmp.Get("org.PulseAudio.Core1.Device","Name")
        for i in sourceindex:
            subprocess.call(["pacmd", "move-sink-input", str(i), sinkname], stdout=self.nulldevice)


        
    def on_active_port_updated(self, port):
        
        """
            when ActivePortUpdated, check port name
            if it's honey_port, move all playbackstreams to it
            and set that sink as default.  
            if not, and no fallback_sink move all playbackstreams to honey_sink
            otherwise will move to fallback_sink
        """
        sourceindex = self.get_playback_streams_index()
        sink_tmp = self.conn.get_object("org.PulseAudio.Core1.Device", os.path.dirname(port), introspect=False)
        sinkname = sink_tmp.Get("org.PulseAudio.Core1.Device","Name")
        port_tmp = self.conn.get_object("org.PulseAudio.Core1.DevicePort", port, introspect=False)
        portname = port_tmp.Get("org.PulseAudio.Core1.DevicePort", "Name")
        if re.search(self.honey_port_pattern, portname) :
            for i in sourceindex:
                subprocess.call(["pacmd", "set-default-sink", sinkname], stdout=self.nulldevice)
                subprocess.call(["pacmd", "move-sink-input", str(i), sinkname], stdout=self.nulldevice)
        else :
            print(portname)
            for i in sourceindex:
                subprocess.call(["pacmd", "set-default-sink", self.honey_sink_names[0]], stdout=self.nulldevice)
                subprocess.call(["pacmd", "move-sink-input", str(i), self.honey_sink_names[0]], stdout=self.nulldevice)


def on_new_sink(obj, sink):
    
    """
        when get a newsink, move all playbackstream to the new sink
        and set that sink as default
        print "Catch NewSink ",sink
    """

    sourceindex = obj.get_playback_streams_index()
    sink_tmp = obj.conn.get_object("org.PulseAudio.Core1.Device", sink, introspect=False)
    sinkname = sink_tmp.Get("org.PulseAudio.Core1.Device","Name")
    for i in sourceindex:
        subprocess.call(["pacmd", "set-default-sink", sinkname], stdout=obj.nulldevice)
        subprocess.call(["pacmd", "move-sink-input", str(i), sinkname], stdout=obj.nulldevice)


if __name__ == "__main__":
    
    if sys.version_info >= (3,):
       loop = GObject.MainLoop()
    if sys.version_info < (3,):
       loop = gobject.MainLoop() 
    
    al = AudioLogic()
    al.default_signal_receiver()
    al.add_signal_receiver(partial(on_new_sink,al), signal_name="NewSink")
    loop.run()
附件
audiologic.py
(7.05 KiB) 下载 37 次
头像
eexpress
帖子: 58428
注册时间: 2005-08-14 21:55
来自: 长沙
送出感谢: 4 次
接收感谢: 256 次

Re: 自动切换音频输出设备python脚本

#2

帖子 eexpress » 2013-02-03 14:48

像模像样的。只是最后调用process,显得不完整了。lol
● 鸣学
回复

回到 “影音多媒体”