diff --git a/Create_Intensity_Lookup.py b/Create_Intensity_Lookup.py index 8313154..d8ec01a 100644 --- a/Create_Intensity_Lookup.py +++ b/Create_Intensity_Lookup.py @@ -1,3 +1,4 @@ +# coding=utf-8 """ ////////////////////////////////////////////////////////////////////////////////// // OpenNX4 - Open source firmware for Barco NX4 tiles @@ -8,51 +9,51 @@ // You may not use this code or a derived work for commercial purposes unless you have a commercial license, contact drtune@gmail.com ////////////////////////////////////////////////////////////////////////////////// """ +from __future__ import division, print_function from NX4CommsHeaderReader import NX4 +# output Xilinx ISE "COE" file (coeffs) to be includes in the ram 8b->12b pixel lookup +with open("ipcore_dir/Intensity_Lookup.coe", "w") as fout: -#output Xilinx ISE "COE" file (coeffs) to be includes in the ram 8b->12b pixel lookup -fout=open("ipcore_dir/Intensity_Lookup.coe","wt") + # http://jared.geek.nz/2013/feb/linear-led-pwm -#http://jared.geek.nz/2013/feb/linear-led-pwm + bits = NX4.INTERNAL_PIXEL_WIDTH_BITS -bits=NX4.INTERNAL_PIXEL_WIDTH_BITS + res = """ + ;GENERATED BY CREATE_INTENSITY_LOOKUP.PY DO NOT EDIT + ; Generated for a 256x{:d} dual port BRAM + memory_initialization_radix=16; + memory_initialization_vector= + """.format(bits) -res=""" -;GENERATED BY CREATE_INTENSITY_LOOKUP.PY DO NOT EDIT -; Generated for a 256x%d dual port BRAM -memory_initialization_radix=16; -memory_initialization_vector= -""" % bits + fullScale = 1 << bits -fullScale=1<>bits - else: - gamma = round(cie1931(float(n)/255)*(fullScale-1)) - - if n==0: - gamma=0 - out.append( gamma ) - res+=(",".join([ "%03x" % n for n in out])) - -res+=";\n" - -print >>fout,res -print res -fout.close() + for n in range(256): + # adjust for eye response + if False: + gamma = pow(fullScale - 1, (n / 256.0)) + gamma = int(gamma * fullScale) >> bits + else: + gamma = round(cie1931(float(n) / 255) * (fullScale - 1)) + + if n == 0: + gamma = 0 + out.append(gamma) + res += (",".join(["{:03x}".format(n) for n in out])) + + res += ";\n" + + print(res, file=fout) + print(res) diff --git a/NX4Comms.py b/NX4Comms.py index 7db8549..10202a8 100644 --- a/NX4Comms.py +++ b/NX4Comms.py @@ -1,3 +1,4 @@ +# coding=utf-8 """ ////////////////////////////////////////////////////////////////////////////////// // OpenNX4 - Open source firmware for Barco NX4 tiles @@ -21,133 +22,144 @@ """ -import argparse,struct,Queue -import serial,time,threading +from __future__ import print_function, division + import random import sys +import threading +import time + +import serial +from PIL.XVThumbImagePlugin import b + from NX4CommsHeaderReader import NX4 from VideoSource import VideoSource +try: # python3 + import queue +except ImportError: # python2 + import Queue as queue + try: - import pygame #for loading images, not vital + import pygame # for loading images, not vital + pygame.init() - HAS_PYGAME=True -except: - HAS_PYGAME=False + HAS_PYGAME = True +except ImportError: + HAS_PYGAME = False -#DOT_CORRECT_DEFAULT=1 #5 #6 bit value +# DOT_CORRECT_DEFAULT=1 #5 #6 bit value -DEFAULT_COM_PORT="/dev/ttyUSB0" #use a COM if on windows +DEFAULT_COM_PORT = "/dev/ttyUSB0" # use a COM if on windows -BAUD=115200<255: - raise Exception("Message too long for this mode (%d) use larger lengthPrefix" % l) - b=chr(l&0xff)+b - elif self.prefixLength==2: - if l>65535: - raise Exception("Message too long for this mode (%d)" % l) - b=chr(l>>8)+chr(l&0xff)+b + l -= 1 + if self.prefixLength == 1: + if l > 255: + raise Exception("Message too long for this mode ({:d}) use larger lengthPrefix".format(l)) + b = chr(l & 0xff) + b + elif self.prefixLength == 2: + if l > 65535: + raise Exception("Message too long for this mode ({:d})".format(l)) + b = chr(l >> 8) + chr(l & 0xff) + b return b - - def setSimulationName(self,name): - self.simulationName=name - - def setLengthPrefix(self,prefixLength): - self.prefixLength=prefixLength + def setSimulationName(self, name): + self.simulationName = name + + def setLengthPrefix(self, prefixLength): + self.prefixLength = prefixLength def sendBreak(self): pass def flushComms(self): - self.sendFlush=False + self.sendFlush = False self.sendBreak() -#////////////////////////////////////////////////////// + +# ////////////////////////////////////////////////////// class VerilogTestGenBase(IODevice): - TIME_SCALE=0.0001 - SIMULATION=True - MAX_SIM_WRITES=5000 + TIME_SCALE = 0.0001 + SIMULATION = True + MAX_SIM_WRITES = 5000 - PREFIX="" - SUFFIX="" - - def __init__(self,deviceParams): + PREFIX = "" + SUFFIX = "" + + def __init__(self, deviceParams): IODevice.__init__(self) - self.simulationName="unknown" #this is not known till after init and is written in - self.deviceParams=deviceParams + self.simulationName = "unknown" # this is not known till after init and is written in + self.deviceParams = deviceParams + + def sleep(self, t): + # todo! insert delay into simulation + pass - def sleep(self,t): - #todo! insert delay into simulation - pass - def openTransport(self): - self.tests=[] - + self.tests = [] + def closeTransport(self): - fileName="%s%s.v" % (self.SIMULATION_NAME_PREFIX,self.simulationName) - filePath=SIM_SCRIPT_FOLDER+fileName - with open(filePath, "wt") as f: + fileName = "{}{}.v".format(self.SIMULATION_NAME_PREFIX, self.simulationName) + filePath = SIM_SCRIPT_FOLDER + fileName + with open(filePath, "w") as f: f.write(self.PREFIX) - tests="\n".join(self.tests) - f.write( "\n// *** AUTOGENERATED DO NOT EDIT : %s *** \n\n" % filePath) - f.write( tests ) + tests = "\n".join(self.tests) + f.write("\n// *** AUTOGENERATED DO NOT EDIT : {} *** \n\n".format(filePath)) + f.write(tests) f.write(self.SUFFIX) - #print "\n\n\n\n\n",tests - print "Wrote ISE test script ",filePath - - def writeSim(self,test): - self.tests+=[test] - if len(self.tests)==self.MAX_SIM_WRITES: - self.stop=True + print("\n\n\n\n\n{}".format(tests)) + print("Wrote ISE test script {}".format(filePath)) + def writeSim(self, test): + self.tests += [test] + if len(self.tests) == self.MAX_SIM_WRITES: + self.stop = True - - -#///////////////////////////////////////////// +# ///////////////////////////////////////////// class SPISimulation(VerilogTestGenBase): - SIMULATION_NAME_PREFIX="spi_" - PREFIX=""" + SIMULATION_NAME_PREFIX = "spi_" + PREFIX = """ always begin:test integer i; `define SPICLK2 100 @@ -157,985 +169,929 @@ class SPISimulation(VerilogTestGenBase): #400 cs=1; """ - SUFFIX=""" + SUFFIX = """ //end of test end """ - WIDTH=8 - - def assertCS(self,active): - self.writeSim("#`SPICLK2 `cs=%d;" % (1-active)) + WIDTH = 8 + + def assertCS(self, active): + self.writeSim("#`SPICLK2 `cs={:d};".format(1 - active)) - def mosiSPIByte(self,value): - self.writeSim("for(i=%d;i>=0;i=i-1) begin" % (self.WIDTH-1) ) - self.writeSim("#`SPICLK2 `mosi=(%d >> i)&1; `sck=1; #`SPICLK2 `sck=0; " % (value) ) + def mosiSPIByte(self, value): + self.writeSim("for(i={:d};i>=0;i=i-1) begin".format(self.WIDTH - 1)) + self.writeSim("#`SPICLK2 `mosi=({:d} >> i)&1; `sck=1; #`SPICLK2 `sck=0; ".format(value)) self.writeSim("end") - def misoSPIByte(self,value): - self.writeSim("tx_data=8'd%d; tx_data_strobe=1; " % (value) ) - + def misoSPIByte(self, value): + self.writeSim("tx_data=8'd{:d}; tx_data_strobe=1; ".format(value)) - def mosiSPI(self,data): + def mosiSPI(self, data): for d in data: self.mosiSPIByte(ord(d)) - def misoSPI(self,data): + def misoSPI(self, data): for d in data: self.misoSPIByte(ord(d)) - + def startMessage(self): - if self.prefixLength==0: + if self.prefixLength == 0: self.assertCS(1) def stopMessage(self): self.assertCS(0) - def sendMessage(self,bytes): - #self.misoSPI(chr(0x8a)) - bytes=self.addPrefix(bytes) + def sendMessage(self, bytes): + # self.misoSPI(chr(0x8a)) + bytes = self.addPrefix(bytes) self.mosiSPI(bytes) - - -#///////////////////////////////////////////// +# ///////////////////////////////////////////// class UARTSimulation(VerilogTestGenBase): + SIMULATION_NAME_PREFIX = "uart_" - SIMULATION_NAME_PREFIX="uart_" - - - PREFIX=""" + PREFIX = """ always begin:test integer i; -`define UARTCLK %d -`define BREAKCLK %d +`define UARTCLK {:d} +`define BREAKCLK {:d} `define rx rx `define cs cs `define sck sck #400 rx=1; - """ % (1000000000/BAUD,(1000000000/BAUD)*BREAK_TIME_BITS ) + """.format(1000000000 // BAUD, (1000000000 // BAUD) * BREAK_TIME_BITS) - SUFFIX=""" + SUFFIX = """ //end of test end """ - WIDTH=8 - - def sendBreak(self): - self.writeSim("#`UARTCLK `rx=0; //send break" ) - self.writeSim("#`BREAKCLK `rx=1; " ) - + WIDTH = 8 - def uartByte(self,value): - #start bit - self.writeSim("#`UARTCLK `rx=0; " ) - #data - self.writeSim("for(i=0;i<%d;i=i+1) begin\t// send 0x%02x" % (self.WIDTH,value) ) - self.writeSim("#`UARTCLK `rx=(%d >> i)&1; " % (value) ) + def sendBreak(self): + self.writeSim("#`UARTCLK `rx=0; //send break") + self.writeSim("#`BREAKCLK `rx=1; ") + + def uartByte(self, value): + # start bit + self.writeSim("#`UARTCLK `rx=0; ") + # data + self.writeSim("for(i=0;i<{:d};i=i+1) begin\t// send 0x{:02x}".format(self.WIDTH, value)) + self.writeSim("#`UARTCLK `rx=({:d} >> i)&1; ".format(value)) self.writeSim("end") - #stop bit - self.writeSim("#`UARTCLK `rx=1; " ) - - + # stop bit + self.writeSim("#`UARTCLK `rx=1; ") def startMessage(self): - if self.prefixLength==0 or self.sendFlush: + if self.prefixLength == 0 or self.sendFlush: self.flushComms() def stopMessage(self): pass - def sendMessage(self,bytes): - if not isinstance(bytes,str): - bytes="".join([chr(d) for d in bytes]) - bytes=self.addPrefix(bytes) + def sendMessage(self, bytes): + if not isinstance(bytes, str): + bytes = "".join([chr(d) for d in bytes]) + bytes = self.addPrefix(bytes) self.writeRaw(bytes) - def writeRaw(self,bytes): + def writeRaw(self, bytes): for b in bytes: self.uartByte(ord(b)) - - def readBytes(self,length): + def readBytes(self, length): return [0 for n in range(length)] - def skipBytes(self,length): + def skipBytes(self, length): pass -#///////////////////////////////////////////// + +# ///////////////////////////////////////////// class UART(IODevice): - TIME_SCALE=1.0 - DEBUG=False + TIME_SCALE = 1.0 + DEBUG = False - def __init__(self,deviceParams={"port":"/dev/ttyUSB0"}): + def __init__(self, deviceParams={"port": "/dev/ttyUSB0"}): IODevice.__init__(self) - self.ttyDevice=deviceParams["port"] - self.baud=BAUD - self.stop=False + self.ttyDevice = deviceParams["port"] + self.baud = BAUD + self.stop = False self.flushReply() def openTransport(self): - self.serial = serial.Serial(self.ttyDevice, self.baud, stopbits=2, timeout=0.2, xonxoff=0, rtscts=False,dsrdtr=False) + self.serial = serial.Serial(self.ttyDevice, self.baud, stopbits=2, timeout=0.2, xonxoff=0, rtscts=False, dsrdtr=False) self.serial.flush() - self.listener=threading.Thread(target=self.listen) - self.listener.daemon=True + self.listener = threading.Thread(target=self.listen) + self.listener.daemon = True self.listener.start() def closeTransport(self): - self.stop=True + self.stop = True time.sleep(1) self.serial.close() - #todo properly + # todo properly def listen(self): - totalRx=0 - self.skipByteCount=0 + totalRx = 0 + self.skipByteCount = 0 while True: - data=self.serial.read(1) + data = self.serial.read(1) if self.stop: return if len(data): - totalRx+=len(data) + totalRx += len(data) for d in data: if self.skipByteCount: - self.skipByteCount-=1 + self.skipByteCount -= 1 else: self.readQ.put(ord(d)) if self.DEBUG: - print "<%02x" % ord(d), + print("<{:02x}".format(ord(d)), end=' ') if self.DEBUG: - print "" - """ - print "\t>", - for d in data: - print "%02x" % ord(d), - print "" - """ - #print ">",totalRx,"<" - #if data: - # print print "0x%02x" % ord(data),data + print("") + # print("\t>", end=' ') + # for d in data: + # print("{:02x}".format(ord(d)), end=' ') + # print("") + # print(">{}<".format(totalRx)) + # if data: + # print(("0x%02x" % ord(data),data)) def sendBreak(self): - self.serial.sendBreak() #this is really slow on linux - + self.serial.sendBreak() # this is really slow on linux def startMessage(self): - if self.prefixLength==0 or self.sendFlush: + if self.prefixLength == 0 or self.sendFlush: self.flushComms() def stopMessage(self): pass - #must be complete message in one call if prefixLength!=0 - def sendMessage(self,bytes): - if not isinstance(bytes,str): - bytes="".join([chr(d) for d in bytes]) - bytes=self.addPrefix(bytes) + # must be complete message in one call if prefixLength!=0 + def sendMessage(self, bytes): + if not isinstance(bytes, str): + bytes = "".join([chr(d) for d in bytes]) + bytes = self.addPrefix(bytes) if self.DEBUG: for d in bytes: - print ">%02x" % d, - print "" + print(">{:02x}".format(d), end=' ') + print("") self.writeRaw(bytes) - def writeRaw(self,b): + def writeRaw(self, b): self.serial.write(b) + def skipBytes(self, length): + self.skipByteCount += length # dubiously threadsafe but eh - def skipBytes(self,length): - self.skipByteCount+=length #dubiously threadsafe but eh - - - def readBytes(self,length): - reply=[] + def readBytes(self, length): + reply = [] for n in range(length): try: - d=self.readQ.get(True,1) + d = self.readQ.get(True, 1) reply.append(d) - except Queue.Empty: - raise ReadTimeout("Read timeout from NX4, got %d of %d" % (len(reply),length)) + except queue.Empty: + raise ReadTimeout("Read timeout from NX4, got {:d} of {:d}".format(len(reply), length)) return reply def flushReply(self): - self.readQ=Queue.Queue() - + self.readQ = queue.Queue() -#///////////////////////////////////////////// +# ///////////////////////////////////////////// class NX4Connector(object): - - #talks to the command processor module in the fpga - def __init__(self,ioDevice,unitID=0): - self.unitID=0 - self.upperMemBits=0xff - self.ioDevice=ioDevice + # talks to the command processor module in the fpga - #remember not to trounce the default settings; could also just read them - self.ioctl=NX4.DEFAULT_BAUD_MULTIPLIER<>8]+values - self.sendIOMessage(bytes) - - def writeFBRaw(self,addr,fbBytes): #takes string of bytes instead of array of ints - bytes=[ NX4.CP_CMD_MODE_FB | (self.unitID<>8] - bytes="".join([chr(d) for d in bytes]) - self.sendIOMessage(bytes+fbBytes) - - - def writeIL(self,addr,values): - cmode=NX4.CP_CMD_MODE_GAMMA - isWrite=1 - bytes=[ cmode | (self.unitID<> 8] + values + self.sendIOMessage(bytes) + + def writeFBRaw(self, addr, fbBytes): # takes string of bytes instead of array of ints + bytes = [NX4.CP_CMD_MODE_FB | (self.unitID << NX4.CP_CMD_MODE_BIT_UNIT_ID) | (1 << NX4.CP_CMD_MODE_BIT_WRITE), + addr & 0xff, addr >> 8] + bytes = "".join([chr(d) for d in bytes]) + self.sendIOMessage(bytes + fbBytes) + + def writeIL(self, addr, values): + cmode = NX4.CP_CMD_MODE_GAMMA + isWrite = 1 + bytes = [cmode | (self.unitID << NX4.CP_CMD_MODE_BIT_UNIT_ID) | (isWrite << NX4.CP_CMD_MODE_BIT_WRITE), + addr & 0xff] + values + self.sendIOMessage(bytes) + + def writeMem(self, addr, values, isFlash=1, isWrite=1): if not hasattr(values, '__iter__'): - values=[values] + values = [values] self.setUpperMemAddress(addr) - cmode=NX4.CP_CMD_MODE_FLASH if isFlash else NX4.CP_CMD_MODE_SRAM - bytes=[ cmode | (self.unitID<>8)&0xff]+values - self.sendIOMessage(bytes) - - def setUpperMemAddress(self,addr): - umb=addr>>16 - if umb==self.upperMemBits: + cmode = NX4.CP_CMD_MODE_FLASH if isFlash else NX4.CP_CMD_MODE_SRAM + bytes = [cmode | (self.unitID << NX4.CP_CMD_MODE_BIT_UNIT_ID) | (isWrite << NX4.CP_CMD_MODE_BIT_WRITE), + addr & 0xff, (addr >> 8) & 0xff] + values + self.sendIOMessage(bytes) + + def setUpperMemAddress(self, addr): + umb = addr >> 16 + if umb == self.upperMemBits: return - self.upperMemBits=umb + self.upperMemBits = umb self.writeRegister(NX4.OpenNX4_REG_MEM_UPPER_ADDR, umb) - - def readMem(self,addr,length,isFlash=1): - values=[0 for d in range(length)] + + def readMem(self, addr, length, isFlash=1): + values = [0 for d in range(length)] self.setUpperMemAddress(addr) - #if length>256: + # if length>256: # self.setLengthPrefix(2) - #same as a write but you send dummy values each of which triggers a read - self.writeMem(addr,values,isFlash=isFlash,isWrite=0) + # same as a write but you send dummy values each of which triggers a read + self.writeMem(addr, values, isFlash=isFlash, isWrite=0) return self.readBytes(length) - - def readRegister(self,addr,count=1): - bytes=[ NX4.CP_CMD_MODE_REGISTERS | (self.unitID<>8,i2cRegAddr&0xff] - - reply=self.i2cTransaction(i2cAddr,i2Addr,readLength=length) - if reply==False: - print ("I2C NACK on address %02x" % i2cAddr) + i2Addr = [i2cRegAddr >> 8, i2cRegAddr & 0xff] + + reply = self.i2cTransaction(i2cAddr, i2Addr, readLength=length) + if reply == False: + print("I2C NACK on address {:02x}".format(i2cAddr)) return False - return reply #array of ints - - def i2cWrite(self,i2cAddr,i2cRegAddr,data,address16Bit=False): - i2Addr=[i2cRegAddr] + return reply # array of ints + + def i2cWrite(self, i2cAddr, i2cRegAddr, data, address16Bit=False): + i2Addr = [i2cRegAddr] if address16Bit: - i2Addr=[i2cRegAddr>>8,i2cRegAddr&0xff] - data=i2Addr+data - reply=self.i2cTransaction(i2cAddr,data,readLength=0) - if reply==False: - print ("I2C NACK on address %02x" % i2cAddr) + i2Addr = [i2cRegAddr >> 8, i2cRegAddr & 0xff] + data = i2Addr + data + reply = self.i2cTransaction(i2cAddr, data, readLength=0) + if reply == False: + print("I2C NACK on address {:02x}".format(i2cAddr)) return False - - def generateSCLSDA(self,scl,sda): - mask=(1<>=1 - scl=0 - sda=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - scl=1 - #sample ACK bit here - samplePos=len(ioctlBytes) - ioctlBytes+=self.generateSCLSDA(scl,sda) - scl=0 - ioctlBytes+=self.generateSCLSDA(scl,sda) + scl = 0 + ioctlBytes += self.generateSCLSDA(scl, sda) + sda = 1 if byte & bit else 0 + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + + bit >>= 1 + scl = 0 + sda = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 1 + # sample ACK bit here + samplePos = len(ioctlBytes) + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 0 + ioctlBytes += self.generateSCLSDA(scl, sda) if sendStop: - sda=0 - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - sda=1 - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - - replyData=self.nx4.writeRegister(NX4.OpenNX4_REG_IOCTL,ioctlBytes) - + sda = 0 + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + sda = 1 + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + + replyData = self.nx4.writeRegister(NX4.OpenNX4_REG_IOCTL, ioctlBytes) + if self.DEBUG: - dp=0 + dp = 0 for d in ioctlBytes: - r=(replyData[dp]&(1<>3&1,d>>4&1,r,"<" if dp==samplePos else "") - dp+=1 - print samplePos,len(ioctlBytes),len(replyData) - #check for ack - sample=(replyData[samplePos]&(1<> 3 & 1, d >> 4 & 1, r, "<" if dp == samplePos else "")) + dp += 1 + print("{} {} {}".format(samplePos, len(ioctlBytes), len(replyData))) + # check for ack + sample = (replyData[samplePos] & (1 << NX4.OpenNX4_STATUS0_BIT_I2C_SDA)) == 0 if self.isSimulation: return True return sample - def i2cRxByte(self,sendNACK=0): - ioctlBytes=[] - samplePositions=[] - bit=0x80 + def i2cRxByte(self, sendNACK=0): + ioctlBytes = [] + samplePositions = [] + bit = 0x80 if self.DEBUG: - print "---RX" - - sda=1 + print("---RX") + + sda = 1 while bit: - scl=0 - ioctlBytes+=self.generateSCLSDA(scl,sda) - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - #sample SDA here + scl = 0 + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + # sample SDA here samplePositions.append(len(ioctlBytes)) - bit>>=1 - #send ack or stop - scl=0 - sda=sendNACK - ioctlBytes+=self.generateSCLSDA(scl,sda) - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - scl=0 - ioctlBytes+=self.generateSCLSDA(scl,sda) - - #extra for debugging - #if self.DEBUG: + bit >>= 1 + # send ack or stop + scl = 0 + sda = sendNACK + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + scl = 0 + ioctlBytes += self.generateSCLSDA(scl, sda) + + # extra for debugging + # if self.DEBUG: # for n in range(5): # ioctlBytes+=self.generateSCLSDA(scl,sda) - #if we sent a NACK, send a stop as well + # if we sent a NACK, send a stop as well if sendNACK: - sda=0 - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) - sda=1 - scl=1 - ioctlBytes+=self.generateSCLSDA(scl,sda) + sda = 0 + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) + sda = 1 + scl = 1 + ioctlBytes += self.generateSCLSDA(scl, sda) - replyData=self.nx4.writeRegister(NX4.OpenNX4_REG_IOCTL,ioctlBytes) + replyData = self.nx4.writeRegister(NX4.OpenNX4_REG_IOCTL, ioctlBytes) - bit=0x80 - byte=0 + bit = 0x80 + byte = 0 for samplePos in samplePositions: - sample=replyData[samplePos]&(1<>=1 + byte |= bit + bit >>= 1 if self.DEBUG: - dp=0 + dp = 0 for d in ioctlBytes: - r=(replyData[dp]&(1<>3&1,d>>4&1,r,"<" if dp in samplePositions else "") - dp+=1 - print "=%02x" % byte + r = (replyData[dp] & (1 << NX4.OpenNX4_STATUS0_BIT_I2C_SDA)) == 1 + print("{:d}{:d} {:d} {}\t".format(d >> 3 & 1, d >> 4 & 1, r, "<" if dp in samplePositions else "")) + dp += 1 + print("={:02x}".format(byte)) return byte - - def i2cTransaction(self,i2cAddr,txData,readLength): - reply=True - self.setAddressModulo(1,0) - if not self.i2cTxByte(i2cAddr<<1,sendStart=True): - #got a NACK + def i2cTransaction(self, i2cAddr, txData, readLength): + reply = True + self.setAddressModulo(1, 0) + if not self.i2cTxByte(i2cAddr << 1, sendStart=True): + # got a NACK return False if self.DEBUG: - print "--- TX OK ON %02x" % i2cAddr - dpos=0 + print("--- TX OK ON {:02x}".format(i2cAddr)) + dpos = 0 for d in txData: - isLastByte= (dpos==len(txData)-1) - sendStop=(isLastByte and readLength==0) - if not self.i2cTxByte(d,sendStop = sendStop ): + isLastByte = (dpos == len(txData) - 1) + sendStop = (isLastByte and readLength == 0) + if not self.i2cTxByte(d, sendStop=sendStop): return False - dpos+=1 + dpos += 1 if readLength: - if not self.i2cTxByte((i2cAddr<<1)|1,sendStart=True): - #got a NACK + if not self.i2cTxByte((i2cAddr << 1) | 1, sendStart=True): + # got a NACK return False - - reply=[] + + reply = [] for r in range(readLength): - isLastByte= (r==readLength-1) - reply.append( self.i2cRxByte(sendNACK=isLastByte) ) + isLastByte = (r == readLength - 1) + reply.append(self.i2cRxByte(sendNACK=isLastByte)) return reply +# ///////////////////////////////////////////// +PIX = 1 << 0 # temp hacking constants for bitbang +SCLK = 1 << 3 +XLAT = 1 << 2 +MODE = 1 << 1 +DC_WIDTH = 6 +GS_WIDTH = 12 -#///////////////////////////////////////////// - -PIX=1<<0 #temp hacking constants for bitbang -SCLK=1<<3 -XLAT=1<<2 -MODE=1<<1 -DC_WIDTH=6 -GS_WIDTH=12 class OpenNX4(NX4BitbangedI2C): - - #https://cdn-shop.adafruit.com/datasheets/TSL2561.pdf - I2C_ADDR_LIGHT_SENSOR=0x39 #: LED Panel ambient light sensor - #http://www.analog.com/media/en/technical-documentation/data-sheets/AD7416_7417_7418.pdf - #https://github.com/rushup/Kitra530-kernel/blob/master/drivers/hwmon/ad7418.c - I2C_ADDR_LED_TEMP=0x48 #: LED Panel temperature sensor - I2C_ADDR_CTRL_TEMP=0x49 #): Control Board temperature sensor - #http://www.atmel.com/images/doc1116.pdf - I2C_ADDR_LED_EEPROM=0x50 #): LED Panel EEPROM (512kbit/64kbyte) - I2C_ADDR_CTRL_EEPROM=0x51 #): Control Board EEPROM (512kbit/64kbyte) - - - def __init__(self,simulate=False): - self.isSimulation=simulate + # https://cdn-shop.adafruit.com/datasheets/TSL2561.pdf + I2C_ADDR_LIGHT_SENSOR = 0x39 #: LED Panel ambient light sensor + # http://www.analog.com/media/en/technical-documentation/data-sheets/AD7416_7417_7418.pdf + # https://github.com/rushup/Kitra530-kernel/blob/master/drivers/hwmon/ad7418.c + I2C_ADDR_LED_TEMP = 0x48 #: LED Panel temperature sensor + I2C_ADDR_CTRL_TEMP = 0x49 # ): Control Board temperature sensor + # http://www.atmel.com/images/doc1116.pdf + I2C_ADDR_LED_EEPROM = 0x50 # ): LED Panel EEPROM (512kbit/64kbyte) + I2C_ADDR_CTRL_EEPROM = 0x51 # ): Control Board EEPROM (512kbit/64kbyte) + + def __init__(self, simulate=False): + self.isSimulation = simulate if simulate: - self.ioDevice=UARTSimulation({}) + self.ioDevice = UARTSimulation({}) else: - self.ioDevice=UART({"port":DEFAULT_COM_PORT}) + self.ioDevice = UART({"port": DEFAULT_COM_PORT}) self.ioDevice.openTransport() - - #eh just lazy + + # eh just lazy self.ioDevice.setSimulationName(SIM_TEST_FILE) - - self.nx4=NX4Connector(self.ioDevice,unitID=0) - self.pixelWord=0 - self.counter=0 + self.nx4 = NX4Connector(self.ioDevice, unitID=0) - #self.precalcScan() + self.pixelWord = 0 + self.counter = 0 - self.images=[] + # self.precalcScan() - self.testLEDs=[0 for d in range(3*32*36)] - self.count=100000 - if self.isSimulation: - self.count=20 - + self.images = [] + self.testLEDs = [0 for d in range(3 * 32 * 36)] + self.count = 100000 + if self.isSimulation: + self.count = 20 def close(self): - #important if you're simulating, not so much for real + # important if you're simulating, not so much for real self.ioDevice.closeTransport() - def sleep(self,t): #real sleep or dummy if simulating + def sleep(self, t): # real sleep or dummy if simulating self.ioDevice.sleep(t) - def selectFB(self,fbIndex): - mask=(1<12b pixel conversion table; this boots up with sensible values already + # write the 8b->12b pixel conversion table; this boots up with sensible values already def testILWrite(self): - ilAddr=0x00 - ilData=[] - #this write isn't quite right in the verilog + ilAddr = 0x00 + ilData = [] + # this write isn't quite right in the verilog for ilCount in range(256): - ilValue=ilCount<<4 #crappy test; this isn't as nice as the default (cie curve) lookup - ilValue= 0xfff if ilValue!=0 else 0 - ilData.append( ilValue&0xff) - ilData.append( ilValue>>8 ) - #self.setAddressModulo(0) + ilValue = ilCount << 4 # crappy test; this isn't as nice as the default (cie curve) lookup + ilValue = 0xfff if ilValue != 0 else 0 + ilData.append(ilValue & 0xff) + ilData.append(ilValue >> 8) + # self.setAddressModulo(0) self.nx4.setLengthPrefix(2) - self.nx4.writeIL(0,ilData) - + self.nx4.writeIL(0, ilData) - def displayImage(self,index,x,y): - imgW,imgH,pixels=self.images[index] - width,height=32,36 - fb=[] - fbAddr=(x*3)+(y*height*3) #x,y not really useful, doesn't clip image yet + def displayImage(self, index, x, y): + imgW, imgH, pixels = self.images[index] + width, height = 32, 36 + fb = [] + fbAddr = (x * 3) + (y * height * 3) # x,y not really useful, doesn't clip image yet for h in range(height): - src=imgW*3*h - for w in range(width*3): - fb.append(pixels[src+w]) #rgb + src = imgW * 3 * h + for w in range(width * 3): + fb.append(pixels[src + w]) # rgb self.nx4.setLengthPrefix(2) - self.nx4.writeFB(fbAddr,fb) - + self.nx4.writeFB(fbAddr, fb) def testImageDisplay(self): if not self.images: - self.loadImage("pacman.png") - self.loadImage("nyan_cat.png") - - self.setFBTarget(1) #write to both at the same time (may be useful? whatever it's free) - y=0 - x=0 - self.displayImage(0,x,y) - self.setFBTarget(2) #write to both at the same time (may be useful? whatever it's free) - y=0 - x=0 - self.displayImage(1,x,y) - - + self.loadImage("pacman.png") + self.loadImage("nyan_cat.png") + + self.setFBTarget(1) # write to both at the same time (may be useful? whatever it's free) + y = 0 + x = 0 + self.displayImage(0, x, y) + self.setFBTarget(2) # write to both at the same time (may be useful? whatever it's free) + y = 0 + x = 0 + self.displayImage(1, x, y) def testI2C(self): - tests=[self.I2C_ADDR_LIGHT_SENSOR,self.I2C_ADDR_LED_TEMP,self.I2C_ADDR_CTRL_TEMP,self.I2C_ADDR_LED_EEPROM,self.I2C_ADDR_CTRL_EEPROM] - length=2 + tests = [self.I2C_ADDR_LIGHT_SENSOR, self.I2C_ADDR_LED_TEMP, self.I2C_ADDR_CTRL_TEMP, self.I2C_ADDR_LED_EEPROM, self.I2C_ADDR_CTRL_EEPROM] + length = 2 for i2cAddr in tests: for i2cRegAddr in range(2): - print "Read I2C %02x reg %02x" % (i2cAddr,i2cRegAddr) - print "=%02x" % self.i2cRead(i2cAddr,i2cRegAddr,1)[0] - #self.setAddressModulo(0) #i2c leaves write modulo wacky + print("Read I2C {:02x} reg {:02x}".format(i2cAddr, i2cRegAddr)) + print("={:02x}".format(self.i2cRead(i2cAddr, i2cRegAddr, 1)[0])) + # self.setAddressModulo(0) #i2c leaves write modulo wacky self.toggleRedLED() - - #not working yet + + # not working yet def readLightSensor(self): - #https://cdn-shop.adafruit.com/datasheets/TSL2561.pdf - i2cAddr=self.I2C_ADDR_LIGHT_SENSOR - - i2cRegAddr=0 | 0x80 - controlReg=3 #3=power on - if self.i2cWrite(i2cAddr,i2cRegAddr,[controlReg]): - print "Light sensor not responding" + # https://cdn-shop.adafruit.com/datasheets/TSL2561.pdf + i2cAddr = self.I2C_ADDR_LIGHT_SENSOR + + i2cRegAddr = 0 | 0x80 + controlReg = 3 # 3=power on + if self.i2cWrite(i2cAddr, i2cRegAddr, [controlReg]): + print("Light sensor not responding") return - #read it back to check - checkControl=self.i2cRead(i2cAddr,i2cRegAddr,1)[0] - print "rbc",checkControl + # read it back to check + checkControl = self.i2cRead(i2cAddr, i2cRegAddr, 1)[0] + print("rbc {}".format(checkControl)) - i2cRegAddr=0xa|0x80 #id register - partId=self.i2cRead(i2cAddr,i2cRegAddr,1)[0] - print "Part number= %02x" % partId - - i2cRegAddr=1 | 0x80 - timingReg=0x02 #2=default; controls gain, integration time - partId=self.i2cWrite(i2cAddr,i2cRegAddr,[timingReg]) - while True: - i2cRegAddr=0x0c | 0x80 - adcReg=self.i2cRead(i2cAddr,i2cRegAddr,4) - ch0=adcReg[0]<<8 | adcReg[1] - ch1=adcReg[2]<<8 | adcReg[3] - print ch0,ch1 - + i2cRegAddr = 0xa | 0x80 # id register + partId = self.i2cRead(i2cAddr, i2cRegAddr, 1)[0] + print("Part number= {:02x}".format(partId)) - def setBB(self,bbBit,bbVal,noWrite=False): - self.bbState= (self.bbState & ~bbBit)|(bbVal*bbBit) + i2cRegAddr = 1 | 0x80 + timingReg = 0x02 # 2=default; controls gain, integration time + partId = self.i2cWrite(i2cAddr, i2cRegAddr, [timingReg]) + while True: + i2cRegAddr = 0x0c | 0x80 + adcReg = self.i2cRead(i2cAddr, i2cRegAddr, 4) + ch0 = adcReg[0] << 8 | adcReg[1] + ch1 = adcReg[2] << 8 | adcReg[3] + print("{} {}".format(ch0, ch1)) + + def setBB(self, bbBit, bbVal, noWrite=False): + self.bbState = (self.bbState & ~bbBit) | (bbVal * bbBit) if not noWrite: - self.nx4.writeRegister(NX4.OpenNX4_REG_BITBANG_TEST,[self.bbState]) - - #baud stuff - def calcClockSettings(): - baseClk=40000000 - base=int(baseClk/115200) - for div in range(8): - actualBaud=baseClk/(base>>div) - print div,actualBaud + self.nx4.writeRegister(NX4.OpenNX4_REG_BITBANG_TEST, [self.bbState]) + # baud stuff + def calcClockSettings(self): + baseClk = 40000000 + base = int(baseClk / 115200) + for div in range(8): + actualBaud = baseClk / (base >> div) + print("{} {}".format(div, actualBaud)) - def testVideoPlayback(self,sourceFile,fps=25): - vs=VideoSource(tileW=32,tileH=36,tileArrayW=1,tileArrayH=1,tileIndexX=0,tileIndexY=0) - #vs=VideoSource(tileW=32,tileH=36,tileArrayW=3,tileArrayH=3,tileIndexX=1,tileIndexY=1) + def testVideoPlayback(self, sourceFile, fps=25): + vs = VideoSource(tileW=32, tileH=36, tileArrayW=1, tileArrayH=1, tileIndexX=0, tileIndexY=0) + # vs=VideoSource(tileW=32,tileH=36,tileArrayW=3,tileArrayH=3,tileIndexX=1,tileIndexY=1) - vs.start(sourceFile,fps=fps) - fbTarget=0 - frameCount=0 - debug=False + vs.start(sourceFile, fps=fps) + fbTarget = 0 + frameCount = 0 + debug = False while True: - frame=vs.nextFrame() + frame = vs.nextFrame() - playTime=frameCount/fps - print "%02d:%02d:%02d - %06d" % (int(playTime/60),int(playTime%60),(playTime%1)%100,frameCount) - frameCount+=1 + playTime = frameCount / fps + print("{:02d}:{:02d}:{:02d} - {:06d}".format(int(playTime / 60), int(playTime % 60), (playTime % 1) % 100, frameCount)) + frameCount += 1 - - #for now, until we get row scan working, duplicate each row 6 times, effectively lowering the vertical res to 6 pixels :-) - dupeRows=False + # for now, until we get row scan working, duplicate each row 6 times, effectively lowering the vertical res to 6 pixels :-) + dupeRows = False if dupeRows: - frameOut="" - height=36 - row=0 - while row>3) - + # f=random.randint(0,255) + self.nx4.cpldTest = (self.nx4.cpldTest + 1) # f #^=1 #^=(1<> 3)) - - #obsolete stuff that bit-bangs the led ports for initial testing + # obsolete stuff that bit-bangs the led ports for initial testing """ def shiftPix(self,width,value,repeats=1,latch=False): - for r in xrange(repeats): - for n in xrange(width): + for r in range(repeats): + for n in range(width): bit=(value>>((width-1)-n))&1 self.setBB(PIX,bit) self.setBB(SCLK,1) @@ -1174,7 +1128,7 @@ def writePixelRegs(self): self.pixelRegsDump.append(rDriver) def flushPixelRegs(self): - #print "Flushing",len(self.pixelRegsDump) + print("Flushing {}".format(len(self.pixelRegsDump)) #the target modulo is set so writing two values loops forever over the two bitbang destination registers self.nx4.writeRegister(NX4.OpenNX4_REG_DRIVERL_BITBANG_TEST,self.pixelRegsDump,skipReply=True) self.pixelRegsDump=[] @@ -1335,7 +1289,7 @@ def bitBangLEDs(self): #set it to use 1-byte length prefixes to delineate commands pixelPos=0 nextRow=(6*32*3) - for n in xrange(self.count): + for n in range(self.count): self.bitbangDotCorrect(DOT_CORRECT_DEFAULT) for r in range(6): @@ -1351,29 +1305,19 @@ def bitBangLEDs(self): #print "%.4f" % (time.time()-ts) if self.ioDevice.stop: return - """ - - def loadImage(self,filename): + """ + + def loadImage(self, filename): if not HAS_PYGAME: raise Exception("Need to install pygame (e.g. 'apt-get install python-pygame', also available on windows)") - image=pygame.image.load(filename) - print "Loaded %s : %dx%d" % (filename,image.get_width(),image.get_height() ) - imageID=len(self.images) - self.images.append( (image.get_width(),image.get_height(),[ ord(d) for d in pygame.image.tostring(image,"RGB")] ) ) + image = pygame.image.load(filename) + print("Loaded {} : {:d}x{:d}".format(filename, image.get_width(), image.get_height())) + imageID = len(self.images) + self.images.append((image.get_width(), image.get_height(), [ord(d) for d in pygame.image.tostring(image, "RGB")])) return imageID - - - - - - - - - - -#//////////////////////////////////////////////// +# //////////////////////////////////////////////// """ todo cmdline args DEFAULT_SERIAL_PORT="/dev/ttyUSB0" @@ -1395,10 +1339,10 @@ def loadImage(self,filename): """ -test="uart" -simulate=False -if len(sys.argv)>1: - simulate=(sys.argv[1]=="test") +test = "uart" +simulate = False +if len(sys.argv) > 1: + simulate = (sys.argv[1] == "test") """ if test=="spi": @@ -1406,55 +1350,47 @@ def loadImage(self,filename): t=SPITest().generateTest("spi_tester_gen.v") """ +if test == "uart": + o = OpenNX4(simulate=simulate) -if test=="uart": - o=OpenNX4(simulate=simulate) - - - brightness=5 #goes up to 63.. BE CAREFUL b/c we haven't got row scanning running, keep this low (say <20) or you may overheat the single row LEDs - #o.testILWrite() - o.setPixelClock(2,0) - o.loadDotCorrect(brightness) #load all pixels w/same dotcorrect value + brightness = 5 # goes up to 63.. BE CAREFUL b/c we haven't got row scanning running, keep this low (say <20) or you may overheat the single row LEDs + # o.testILWrite() + o.setPixelClock(2, 0) + o.loadDotCorrect(brightness) # load all pixels w/same dotcorrect value o.clearFB() - #this is so slow you can see the rows scanning, is kinda cool/useful - #o.setPixelClock(9,0) + # this is so slow you can see the rows scanning, is kinda cool/useful + # o.setPixelClock(9,0) while not o.ioDevice.stop: - + try: - #o.testFlashPowerLED() #'hello world!' - #o.testWriteFB() - #o.testSetupFill() + # o.testFlashPowerLED() #'hello world!' + # o.testWriteFB() + # o.testSetupFill() o.testImageDisplay() o.testFBBlending() - - #anything ffmpeg will play.. - #o.testVideoPlayback("../../media/test.mkv",fps=23.976215) - #o.ioDevice.stop=True - #o.testDotCorrectLoad() - - #this is a great speed for actual use, 5mhz pixel clock - #which is only 695us for a whole frame (1438fps) - - - #o.testWriteTestPixel() - #o.testI2C() - - #o.testRWMemory() - #o.dumpMainFlash() - #o.dumpEEPROMs() - #o.readLightSensor() - #o.testLoadLEDs() - #o.bitBangLEDs() - o.count+=1 - except ReadTimeout: - print ">> Read timeout <<" - - print "Closing" - o.close() + # anything ffmpeg will play.. + # o.testVideoPlayback("../../media/test.mkv",fps=23.976215) + # o.ioDevice.stop=True + # o.testDotCorrectLoad() + # this is a great speed for actual use, 5mhz pixel clock + # which is only 695us for a whole frame (1438fps) + # o.testWriteTestPixel() + # o.testI2C() + # o.testRWMemory() + # o.dumpMainFlash() + # o.dumpEEPROMs() + # o.readLightSensor() + # o.testLoadLEDs() + # o.bitBangLEDs() + o.count += 1 + except ReadTimeout: + print(">> Read timeout <<") + print("Closing") + o.close() diff --git a/NX4CommsHeaderReader.py b/NX4CommsHeaderReader.py index 4b8404e..3e9b7a6 100644 --- a/NX4CommsHeaderReader.py +++ b/NX4CommsHeaderReader.py @@ -1,19 +1,18 @@ +# coding=utf-8 import re + class NX4CommsHeaderReaderClass(object): - def __init__(self,file="nx4_header_file.vh"): - defines={} - with open(file,"rt") as f: + def __init__(self, file="nx4_header_file.vh"): + with open(file, "r") as f: for l in f.readlines(): - m=re.match(r"`define\s+(\w+)\s+(\d+)",l) + m = re.match(r"`define\s+(\w+)\s+(\d+)", l) if m: - defName=m.group(1) - #only simple integers supported! - defVal=int(m.group(2)) - #print defName,defVal - self.__dict__[defName]=defVal - + defName = m.group(1) + # only simple integers supported! + defVal = int(m.group(2)) + # print(defName,defVal) + self.__dict__[defName] = defVal - -NX4=NX4CommsHeaderReaderClass() +NX4 = NX4CommsHeaderReaderClass() diff --git a/VideoSource.py b/VideoSource.py index 1389019..220783d 100644 --- a/VideoSource.py +++ b/VideoSource.py @@ -1,3 +1,4 @@ +# coding=utf-8 """ #requires #ffmpeg @@ -13,105 +14,112 @@ """ +from __future__ import division, print_function -import time,os,re,sys,threading,Queue +try: # python3 + import queue +except ImportError: # python2 + import Queue as queue -#ignore this bit not using hw accel right now +import os +import threading +import time + +# ignore this bit not using hw accel right now try: - with open("/dev/cedar","rb"): + with open("/dev/cedar", "rb"): pass except: - pass #print "/dev/cedar not present for hardware acceleration" + pass # print "/dev/cedar not present for hardware acceleration" -FIFO_PIPE="/tmp/videopipe" +FIFO_PIPE = "/tmp/videopipe" try: os.mkfifo(FIFO_PIPE) except: pass -QUEUE_LENGTH=4 +QUEUE_LENGTH = 4 + class VideoSource(object): - def __init__(self,tileW=32,tileH=36,tileArrayW=3,tileArrayH=3,tileIndexX=1,tileIndexY=1): - self.lastFrameTime=0 - - #total tile array size - self.tilesX=tileArrayW - self.tilesY=tileArrayH - #each tile - self.tW=tileW - self.tH=tileH - #our x,y offset within that - self.tXOff=tileIndexX*tileW - self.tYOff=tileIndexY*tileH - - self.outQueue=Queue.Queue(maxsize=QUEUE_LENGTH) - - def start(self,sourceFile,fps=25): - self.targetFPS=fps - self.sourceFile=sourceFile - self.listenerThread=threading.Thread(target=self.startListener) - self.listenerThread.daemon=True + def __init__(self, tileW=32, tileH=36, tileArrayW=3, tileArrayH=3, tileIndexX=1, tileIndexY=1): + self.lastFrameTime = 0 + + # total tile array size + self.tilesX = tileArrayW + self.tilesY = tileArrayH + # each tile + self.tW = tileW + self.tH = tileH + # our x,y offset within that + self.tXOff = tileIndexX * tileW + self.tYOff = tileIndexY * tileH + + self.outQueue = queue.Queue(maxsize=QUEUE_LENGTH) + + def start(self, sourceFile, fps=25): + self.targetFPS = fps + self.sourceFile = sourceFile + self.listenerThread = threading.Thread(target=self.startListener) + self.listenerThread.daemon = True self.listenerThread.start() - self.playerThread=threading.Thread(target=self.startPlayer) - self.playerThread.daemon=True + self.playerThread = threading.Thread(target=self.startPlayer) + self.playerThread.daemon = True self.playerThread.start() - def startPlayer(self): - totalWidth,totalHeight=self.tW*self.tilesX , self.tH*self.tilesY - #cmd="ffmpeg -y -i %s -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 /tmp/video -nostats -loglevel 0" % (sourceFile,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff) - #cmd="ffmpeg -y -i %s -vf fps=fps=%d -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 /tmp/video" % (sourceFile,self.targetFPS,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff) - quietOption=" -nostats -loglevel 0" - #cmd="ffmpeg -y -i %s -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 %s" % (self.sourceFile,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff,FIFO_PIPE) - #cropping is odd right now - cmd="ffmpeg -y -i %s -vf scale=w=%d:h=%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 %s" % (self.sourceFile,totalWidth,totalHeight,FIFO_PIPE) - print cmd - cmd+=quietOption - print "Playing",cmd + totalWidth, totalHeight = self.tW * self.tilesX, self.tH * self.tilesY + # cmd="ffmpeg -y -i %s -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 /tmp/video -nostats -loglevel 0" % (sourceFile,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff) + # cmd="ffmpeg -y -i %s -vf fps=fps=%d -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 /tmp/video" % (sourceFile,self.targetFPS,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff) + quietOption = " -nostats -loglevel 0" + # cmd="ffmpeg -y -i %s -vf scale=w=%d:h=%d -vf crop=%d:%d:%d:%d -f rawvideo -vcodec rawvideo -pix_fmt rgb24 %s" % (self.sourceFile,totalWidth,totalHeight,self.tW,self.tH,self.tXOff,self.tYOff,FIFO_PIPE) + # cropping is odd right now + cmd = "ffmpeg -y -i {} -vf scale=w={:d}:h={:d} -f rawvideo -vcodec rawvideo -pix_fmt rgb24 {}".format(self.sourceFile, totalWidth, totalHeight, FIFO_PIPE) + print(cmd) + cmd += quietOption + print("Playing {}".format(cmd)) os.system(cmd) - def readFrame(self,inFile): - frameSize=3*self.tW*self.tH - #may stall here - frame=inFile.read(frameSize) - #may stall here + def readFrame(self, inFile): + frameSize = 3 * self.tW * self.tH + # may stall here + frame = inFile.read(frameSize) + # may stall here self.outQueue.put(frame) - - doPrint=False + + doPrint = False if doPrint: for n in range(16): - print "%02x " % ord(frame[n]), - print "" - now=time.time() - actualFrameTime=now - self.lastFrameTime - self.lastFrameTime=now - frameTime=1.0/self.targetFPS - diff = frameTime - actualFrameTime - if diff>0.001: - #print "sleep",diff + print("{:02x} ".format(ord(frame[n])), end=' ') + print("") + now = time.time() + actualFrameTime = now - self.lastFrameTime + self.lastFrameTime = now + frameTime = 1.0 / self.targetFPS + diff = frameTime - actualFrameTime + if diff > 0.001: + # print "sleep",diff time.sleep(diff) def getDimensions(self): - return tW,tH + return self.tW, self.tH def nextFrame(self): - return self.outQueue.get() #blcoks, fps limited + return self.outQueue.get() # blcoks, fps limited def startListener(self): while True: - with open(FIFO_PIPE,"rb") as f: + with open(FIFO_PIPE, "rb") as f: while True: self.readFrame(f) def test(): - vs=VideoSource() - sourceFile="../../media/1080test.mp4" + vs = VideoSource() + sourceFile = "../../media/1080test.mp4" vs.start(sourceFile) while True: - frame=vs.outQueue.get() - print "yay",ord(frame[0]) - -#test() + frame = vs.outQueue.get() + print("yay {}".format(ord(frame[0]))) +# test()