#----------------------------------------------------------------------------- # Name: usbADC.py # Purpose: Set up and control the TI ads1271 device # # Author: RJS # # Created: 2004/14/12 # RCS-ID: $Id: c55ads1271.py $ # Copyright: (c) 2005 # Licence: #----------------------------------------------------------------------------- """ module to use USB ADC devices with Python """ import ConfigParser import usb import struct import time import os import sys #------------------------------------------------------------------------------- # module constants #------------------------------------------------------------------------------- ## Vendor type transfers with a recipient of Interface HOST2DEVICE = int('01000001',2) #65 DEVICE2HOST = int('11000001',2) #193 # interface is 1, for now, any # can be used! INTERFACE = 1 WRAP = 2.**23 BITS24 = 2.**24 ## endpoint that the MMB0 expects the firmware to be written to ADS1271BULKOUT = 0x06 ADS1271EVMBULKIN = 0x81 #--------------------------------------------------------------------------- # # module methods - device independant # #--------------------------------------------------------------------------- def FindDevices(): ## enumerate the usb busses busses = usb.busses() for bus in busses: ## enumerate each of the devices on that bus devices = bus.devices ret = [] for dev in devices: deviceHandle = dev.open() tup = () tup += (dev.idVendor,) try: tup += (deviceHandle.getString(1,100),) except: tup += ('',) tup += (dev.idProduct,) try: tup += (deviceHandle.getString(2,100),) except: tup += ('',) tup += (dev.iSerialNumber,) ret.append(tup) return ret def _convStrToFloats(ans, gainFactors=[1.,1.], DCFactors=[0.,0.], numChannels=2): data = [] for i in range(0, len(ans), 3*numChannels): ## 3 bytes per samp chValues = [] for thisCh in range(numChannels): chValues.append(struct.unpack(">I",struct.pack(">4b",0,*ans[i+3*thisCh:i+3*thisCh+3]))[0]) if chValues[thisCh]>WRAP: chValues[thisCh] = ((BITS24-chValues[thisCh]) / WRAP) * gainFactors[thisCh] else: chValues[thisCh] = (-chValues[thisCh] / WRAP) * gainFactors[thisCh] chValues[thisCh] -= DCFactors[thisCh] data.append(chValues) ## return a LOL of interleaved data -> [[ch1, ch2],[ch1, ch2],...] return data def convStrToFloats(ans, gainFactors=[1.,1.], DCFactors=[0.,0.], numChannels=2): #data = [[] for i in range(numChannels)] data = [[],[]] for byteN in range(0, len(ans), 3*numChannels): ## 3 bytes per samp #print '\n', byteN, byteN/6 for thisCh in range(numChannels): #print ' ', thisCh, len(data[thisCh]) try: chValue = struct.unpack(">I",struct.pack(">4b", 0,*ans[byteN+3*thisCh:byteN+3*thisCh+3]))[0] except: chValue = 0 if chValue>WRAP: chValue = ((BITS24-chValue) / WRAP) * gainFactors[thisCh] else: chValue = (-chValue / WRAP) * gainFactors[thisCh] data[thisCh].append(chValue-DCFactors[thisCh]) #print data[thisCh][-1] ## return a LOL of channel data -> [[ch1, ch1,...],[ch2, ch2,...],...] return data def read(dev, numbytes, ep=ADS1271EVMBULKIN, timeout = 100): """ read(numbytes, timeout = 100) -> data Read data from usb device. dev: USB device object file handle numbytes: number of bytes to read. timeout: operation timeout. """ ans = () tryNum = 0 #print 'bulkRead:', while True: time.sleep(.001) ## needed on Debian machine? it is faster, some erros seen print 'slept' #print tryNum, '\t', getBlocksAvail(), '\t', try: ans += dev.bulkRead(ep, numbytes, timeout) #fails or () except: print 'usb ERROR', sys.exc_info()[0] raise #return (0x00,) #print 'len', len(ans), # just get one for now if len(ans)>=numbytes: break tryNum += 1 #print return ans def write(dev, buffer, ep=ADS1271BULKOUT, timeout = 100): """ write(dev, buffer, timeout = 100) -> written Write data to usb device. dev: USB device object. buffer: data buffer. timeout: operation timeout. """ return dev.bulkWrite(ep, buffer, timeout) def writeFirmware(dev, fileName): """ Write new firmware to usb device. dev: USB device object. buffer: data ep: device's write endpoint timeout: operation timeout. """ try: f = open(fileName, 'rb') firmware = f.read() f.close(); except IOError, (errno, strerror): print "I/O error(%s): %s: %s" % (errno, strerror, fileName) raise SystemExit, 1 except: print "IO error: couldn't read in the firmware: " + fileName raise SystemExit, 1 print "Read in firmware: " + fileName try: devHandle = dev.open() except: print "MMB0 open error. " devHandle.setConfiguration(dev.configurations[0]) devHandle.claimInterface(0) try: write(devHandle, firmware, ep=ADS1271BULKOUT, timeout = 500) print "wrote to firmware: " + fileName except Exception, (strerror): print "USB write error: %s: %s Are you root?" % (strerror, fileName) devHandle.releaseInterface() #needed? def PrintDevInfo(dev): """Print device information.""" print "Device:", dev.filename print " Device class:",dev.deviceClass print " Device sub class:",dev.deviceSubClass print " Device protocol:",dev.deviceProtocol print " Max packet size:",dev.maxPacketSize print " idVendor:",dev.idVendor print " idProduct:",dev.idProduct print " Device Version:",dev.deviceVersion print " Device SerialNumber:",dev.iSerialNumber for config in dev.configurations: print " Configuration:", config.value print " Total length:", config.totalLength print " selfPowered:", config.selfPowered print " remoteWakeup:", config.remoteWakeup print " maxPower:", config.maxPower for intf in config.interfaces: print " Interface:",intf[0].interfaceNumber for alt in intf: print " Alternate Setting:",alt.alternateSetting print " Interface class:",alt.interfaceClass print " Interface sub class:",alt.interfaceSubClass print " Interface protocol:",alt.interfaceProtocol for ep in alt.endpoints: print " Endpoint:",hex(ep.address) print " Type:",ep.type print " Max packet size:",ep.maxPacketSize print " Interval:",ep.interval #--------------------------------------------------------------------------- # # device specific classes # #--------------------------------------------------------------------------- class c55ads1271: """ definition of an ads1271 USB ADC Usage Ex: try: a2d=usbADC.c55ads1271(0) except Exception, (strerror): print "errorz: %s" % (strerror,) -configure device for current settings -find the appropriate device -load firmware, if needed -claim its interface Notes: call: .self.deviceHandle.releaseInterface() as needed """ def __init__(self, serialNumber=0): # this needs to be extended for multiple ads1271 devices, with iSerialNumber ## read the device's config file and set the vars self.init_config('c55ads1271.cfg') #self.activeChannels = 2 #self.activeChannelList = [1, 2] ## check that the firmware is loaded devMMB0 = self.FindMMB0BootLoader() if devMMB0 != 0: print "Found an MMB0 waiting for firmware" #PrintDevInfo(devMMB0) writeFirmware(devMMB0, str(self.config.get('main', 'ads1271binfilename'))) t0 = time.time() ## wait for the 1271 to come back up... while time.time()-t0<1: devads1271evm = self.Find1271evm() if devads1271evm != 0: break time.sleep(.1) if devads1271evm != 0: print "Found ads1271evm\n" #PrintDevInfo(devads1271evm) self.device = devads1271evm config = devads1271evm.configurations[0] self.deviceHandle = devads1271evm.open() self.deviceHandle.setConfiguration(config) try: self.deviceHandle.claimInterface(0) except: raise return None else: self.deviceHandle.setAltInterface(0) ## check for running or stuck ADC blocksAvail = self.getBlocksAvail() boardState = self.getBoardState()[0] print 'init - blksAvail:', blocksAvail, 'boardState', boardState #if blocksAvail>2 or boardState!=0: if boardState!=0: s = 'blocksAvail='+str(blocksAvail)+', '+\ 'boardState='+str(boardState)+\ '; reset needed' raise Exception, (1001, s) self.blockSize = self.getBlockSize() self.getSampRate() self.activeChannels = 2 else: print 'ads1271evm not found!\n' raise Exception, (1002, 'ads1271evm not found') def init_config(self, fileName): """ read the CV100.cfg file and set the global configuration on start-up """ ## start the Configparser module self.config = ConfigParser.ConfigParser() try: fileHandle = file(fileName, 'r') fileHandle.close() except: self.simpleError("File open error:\n"+fileName+"\n" +\ "not found in\n"+ os.getcwd()) self.Close(True) self.config.read(fileName) self.gainFactors = [float(x) for x in self.config.get('aquisition', 'gainFactors').split(',')] self.DCFactors = [float(x) for x in self.config.get('aquisition', 'DCFactors').split(',')] self.numChannels = int(self.config.get('aquisition', 'numChannels')) #--------------------------------------------------------------------------- # # helper methods # #--------------------------------------------------------------------------- def FindMMB0BootLoader(self): ## enumerate the usb busses busses = usb.busses() for bus in busses: ## enumerate each of the devices on that bus devices = bus.devices for dev in devices: #print dev, dev.idVendor, int(self.config.get('main', 'ads1271Vendor')[2:], 16) if dev.idVendor == int(self.config.get('main', 'ads1271Vendor')[2:], 16) and \ dev.idProduct == int(self.config.get('main', 'ads1271Product')[2:], 16): return dev return 0 def Find1271evm(self): ## enumerate the usb busses busses = usb.busses() for bus in busses: ## enumerate each of the devices on that bus devices = bus.devices for dev in devices: if dev.idVendor == int(self.config.get('main', 'ads1271evmVendor')[2:], 16) and \ dev.idProduct == int(self.config.get('main', 'ads1271evmProduct')[2:], 16): return dev return 0 def getData(self, numChannels=2): #test version print getBlocksAvail = self.getBlocksAvail blockSize = self.blockSize n = 1 #blocks, todo... ans = () #numBytes = n*self.blockSize*3*2 ## check for running ADC boardState = self.getBoardState()[0] blocksAvail = getBlocksAvail() if boardState==4: print 'bs', boardState, blocksAvail pass # still in RDATA elif boardState==2 and blocksAvail<2: print 'boardState', boardState, 'blksAvl', blocksAvail, 'need RDATA' self.deviceHandle.controlMsg(HOST2DEVICE, request=int(0x04), buffer=struct.pack(">L",n), index=INTERFACE) else: print'self.getBoardState()[0]', boardState, 'blksavail', blocksAvail pass #return None ## read a block tryNum = 0 exceptions = 0 print 'try: blksAvl:' t0 = time.clock() while time.clock()-t0<1: blocksAvail = getBlocksAvail() if blocksAvail<2: print tryNum, ' ', blocksAvail, 'waiting...', .5*self.blockSize/self.sampleRate ## sleep for ~ 1/2 block #time.sleep(.5*self.blockSize/self.sampleRate) else: break ## send an RDATA for all blocks self.deviceHandle.controlMsg(HOST2DEVICE, request=int(0x04), buffer=struct.pack(">L",blocksAvail), index=INTERFACE) numBytes = blockSize*3*numChannels*(blocksAvail) ## loop until a full block arrives or it's been too long... while len(ans)L",n), index=INTERFACE) #print 'sent RDATA' time.sleep(1) ## read a block ans = () tryNum = 0 print '\ntry: blksAvl:' while True: #time.sleep(.01) ## needed on Debian machine? print tryNum, '\t', getBlocksAvail(), '\t', #res = self.deviceHandle.bulkRead(ADS1271EVMBULKIN, numBytes+10000, 100) try: res = self.deviceHandle.bulkRead(ADS1271EVMBULKIN, numBytes+10000, 100) except Exception, (strerror): print "USB error: %s" % (strerror) res = () #return (0x00,) ans += res print 'len res:', len(res), 'total:', len(ans) tryNum += 1 ## loop until a full block arrives bytesReceived = len(ans) if bytesReceived>=numBytes or tryNum>2: break if bytesReceived!=numBytes: print 'bad USB', bytesReceived, ## lop off the odd bytes! evenNumOfSamps = (bytesReceived/(3*numChannels)) * 3 * numChannels -6 ## shift #ans = ans[:evenNumOfSamps] print len(ans) """sets = 0 while True: print ans[sets:sets+3], ans[sets+3:sets+6] sets += 6 if sets>len(ans): print ans[-len(ans)+sets-6:] break""" return convStrToFloats(ans, self.gainFactors, self.DCFactors, numChannels) #------------------------------------------------------------------------------- # # ADC methods # #------------------------------------------------------------------------------- def getBoardState(self): stateDict = {0: 'IDLE = idle', 1: 'ADCSTT = starting ADC', 2: 'ADCRUN = ADC running', 3: 'ADCSTP = stopping ADC', 4: 'RDATA = retrieving data', 5: 'TSTRAM = testing RAM', 8: 'TSTFLASH = testing flash', 86: 'ADSERR = ADC start error (IDLE)'} try: state = self.deviceHandle.controlMsg(DEVICE2HOST, request=int(0x00), buffer=1, index=INTERFACE) except Exception, (strerror): print "set Blocksize error: %s " % (strerror) return -1 if state==(): print '\nboard error:', state return (-1, 'an error occured') elif state[0]>128: print '\nboardState error:', state[0] return (state[0], 'an error occured') else: #print 'boardState:', stateDict[state[0]], '- state#',state[0] return (state[0], stateDict[state[0]]) def getBufLen(self): try: res = self.deviceHandle.controlMsg(DEVICE2HOST, request=0x09, buffer=4, index=INTERFACE) except: return -1 bl = struct.unpack('>L', struct.pack('>4b', *res))[0] print 'bufLen result:', bl, '(', hex(bl), ')' return bl def getSampRate(self): try: res = self.deviceHandle.controlMsg(DEVICE2HOST, request=0x10, buffer=4, index=INTERFACE) except: return -1 sr = struct.unpack('>L', struct.pack('>4b', *res))[0]/1000. print 'samp rate:', sr, 'Hz' self.sampleRate = sr return sr def setSampRate(self, rateReq): rate = struct.pack(">L", long ( rateReq ) ) try: res = self.deviceHandle.controlMsg(HOST2DEVICE, request=0x10, buffer=rate, index=INTERFACE) except: return -1 print 'rate req:', float(rateReq)/1000., ', set to', time.sleep(.05) return self.getSampRate() def getBlocksAvail(self): try: res = self.deviceHandle.controlMsg(DEVICE2HOST, request=0x07, buffer=4, index=INTERFACE) except: return -1 ba = struct.unpack('>L', struct.pack('>4b', *res))[0] #print 'blocks avail:', ba, res return ba def getBlockSize(self): try: res = self.deviceHandle.controlMsg(DEVICE2HOST, request=0x0A, buffer=2, index=INTERFACE) except: return -1 bs = struct.unpack('>H', struct.pack('>bb', *res))[0] print 'blockSize is:', bs self.blockSize = bs return bs def setBlockSize(self, size): ## size must be <= 2**16-1 if self.getBoardState()[0]==0L: try: res = self.deviceHandle.controlMsg(HOST2DEVICE, request=0x0A, buffer=2, value=size, index=INTERFACE) except Exception, (strerror): print "set Blocksize error: %s " % (strerror) return -1 bs = struct.unpack('>h', struct.pack('>bb', *res))[0] gbs = self.getBlockSize() print 'blockSize requested:', size, ', set to:', gbs else: print 'can\'t set blockSize while not idle - state:', self.getBoardState() return gbs def getConvMode(self): modeDict = {0:'high speed', 1:'high resolution', 2:'low power'} try: res = self.deviceHandle.controlMsg(DEVICE2HOST, request=0x0B, buffer=1, value=0, index=INTERFACE) except Exception, (strerror): print "get Conversion error: %s " % (strerror) return -1 #print 'mode:', res, modeDict[res[0]] return res[0] def setConvMode(self, mode): modeDict = {0:'high speed', 1:'high resolution', 2:'low power'} try: res = self.deviceHandle.controlMsg(HOST2DEVICE, request=0x0B, buffer='', value=mode, index=INTERFACE) except Exception, (strerror): print "set Conversion error: %s " % (strerror) return -1 mode = self.getConvMode() print 'mode set to:', mode, modeDict[mode] return mode def startADC(self): print 'starting ADC:', bs = self.getBoardState() if bs[0]!=0: print 'not IDLE', bs[0] return bs[0] else: try: res = self.deviceHandle.controlMsg(HOST2DEVICE, request=int(0x02), buffer=4, index=INTERFACE) except: return -1 t0 = time.time() while time.time()-t0<1: bs = self.getBoardState() if bs[0]==2: self.startTime = time.clock() print bs return bs[0] ## timed out, stop self.deviceHandle.controlMsg(HOST2DEVICE, request=int(0x03), buffer=4) print 'timed out!' return bs[0] def stopADC(self): print 'stopping ADC:', try: res = self.deviceHandle.controlMsg(HOST2DEVICE, request=int(0x03), buffer=4, index=INTERFACE) except Exception, (strerror): print "stop error: %s " % (strerror) return -1 t0 = time.time() while time.time()-t0<.5: bs = self.getBoardState() if bs[0]==0: print bs return bs[0] elif bs[0]==-1: print bs return bs[0] ## timed out - some other state print 'timed out!' return bs[0] def testSDRAM(self): ##test for IDLE if self.getBoardState()[0]==0: print self.deviceHandle.controlMsg(HOST2DEVICE, request=240, buffer='', index=INTERFACE) while True: time.sleep(.001) res = self.deviceHandle.controlMsg(DEVICE2HOST, request=240, buffer=3, index=INTERFACE) print res[3]*128, 'KB pages tested' if res[0]==1: break