Peter's Blog

Redefining the Impossible

Python ODBC Stuff


I haven't posted any code for a while.

Here is a python module called DBTable to encapsulate the odbc module. It works with dictionarys of field name->value mappings which can be fetched from the database, inserted into the database, queried for or deleted.

   1  #
   2  # Database wrapper class.
   3  #
   4  import odbc
   5  import os
   6  import traceback
   7  
   8  class DBTable:
   9      """
  10      Wrapper for database table
  11      """
  12      FIELD_TYPE = 0
  13  
  14      def __init__( self, oConnection, strTable):
  15          self.oConnection = oConnection
  16          self.strTable = strTable
  17  
  18          oCursor = oConnection.cursor()
  19          oCursor.execute( "SELECT * FROM %s" % strTable)
  20  
  21          self.oFields = [ oField[0] for oField in oCursor.description]
  22          self.oFieldDescription = dict( [ (oField[0],
  23                                        oField[1:]) for oField in oCursor.description])
  24  
  25      def Select( self, strQuery):
  26          """
  27          Select records from query
  28  
  29          Takes either SQL of select statement or a dictionary containing
  30          field names and values to find.
  31          """
  32          self.oCursor = self.oConnection.cursor()
  33          if type( strQuery) == type(""):
  34              self.oCursor.execute( strQuery)
  35          else:
  36              #
  37              # assume query is a dict
  38              #
  39              self.oCursor.execute( "SELECT * FROM %s WHERE %s" % (self.strTable,
  40                                                          self.DictToWhere( strQuery)))
  41  
  42      def FetchOne( self):
  43          """
  44          Get next row of results
  45          Returns a dictionary holding field names and values.
  46          """
  47          oRow = self.oCursor.fetchone()
  48          if oRow:
  49              """
  50              Build a dictionary to map field name->value
  51              """
  52              return dict([(self.oFields[i], oRow[i]) for i in range(len(oRow))])
  53          else:
  54              return None
  55  
  56      def Insert( self, oDict):
  57          """
  58          Insert a row in the database
  59          Takes a dictionary holding field names and values.
  60          """
  61          strFields = oDict.keys()
  62          strValues = []
  63          for strField in strFields:
  64              strValue = oDict[strField]
  65              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
  66  
  67              strValues.append( self.FormatField( strField, strValue))
  68  
  69          strSQL = "INSERT INTO %s ( %s) VALUES(%s);" % ( self.strTable,
  70                                                           ", ".join( strFields),
  71                                                           ", ".join( strValues))
  72          print strSQL
  73          oCursor = self.oConnection.cursor()
  74          oCursor.execute( strSQL)
  75          self.oConnection.commit()
  76  
  77      def Delete( self, oDict):
  78          """
  79          Delete row based on dictionary contents
  80          Takes a dictionary holding field names and values.
  81          """
  82          strSQL = "DELETE FROM %s WHERE %s;" % ( self.strTable, self.DictToWhere( oDict))
  83          print strSQL
  84          oCursor = self.oConnection.cursor()
  85          oCursor.execute( strSQL)
  86          self.oConnection.commit()
  87  
  88      def DictToWhere( self, oDict):
  89          """
  90          Convert dictionary to WHERE clause.
  91          """
  92          strFields = oDict.keys()
  93          strExpressions = []
  94  
  95          for strField in strFields:
  96              strValue = oDict[strField]
  97              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
  98  
  99              strValue = self.FormatField( strField, strValue)
 100  
 101              strExpressions.append( '%s = %s' % (strField, strValue))
 102  
 103          return " AND ".join( strExpressions)
 104  
 105      def FormatField( self, strField, strValue):
 106          """
 107          Format a field for an sql statement.
 108          """
 109          strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 110          if strType == 'STRING':
 111              return "'%s'" % str(strValue).replace( "'", "''")
 112          elif strType == 'NUMBER':
 113              return '%d' % int( strValue)
 114          else:
 115              raise 'unknown field type %s' % strType
 116  
Toggle Line Numbers

Here is something that uses it. Run from the command line as

DBView.py <odbc connection name> <table name>

it will create an html dump of the specified table in the specified odbc connection and will display the html in your default browser. This is useful if, for example, you are working on a project that uses Access 2003 database but you only have Access 97.

   1  #
   2  # Dump database tables in html format.
   3  #
   4  import odbc
   5  import os
   6  import traceback
   7  import DBTable
   8  
   9  def DumpDBTable( oConnection, strTable):
  10      "Dump specified table from ODBC connection"
  11  
  12      #
  13      # Connect to database
  14      #
  15      oCursor = oConnection.cursor()
  16  
  17      #
  18      # Start building html output.
  19      #
  20      strRows = []
  21  
  22      strRows.append( u"""<head>
  23      <title>Dump of %s</title>
  24      </head>
  25      <body>
  26  """ % (strTable))
  27  
  28      strRows.append( "<h1>%s</h1>" % (strTable,))
  29      strRows.append( '<table border="1" cellspacing="0">')
  30      try:
  31  
  32          oDbTable = DBTable.DBTable( oConnection, strTable)
  33          oDbTable.Select( 'SELECT * FROM %s' % strTable)
  34  
  35          strRow = "<tr>"
  36  
  37          for strField in oDbTable.oFields:
  38              strRow += "<th>%s</th" % strField
  39  
  40          strRow += "</tr>"
  41          strRows.append( strRow)
  42  
  43          while 1:
  44              oRow = oDbTable.FetchOne()
  45              if oRow == None:
  46                  break
  47  
  48              #
  49              # Walk through the columns in each row of the table.
  50              #
  51              strRow = "<tr>"
  52  
  53              for strField in oDbTable.oFields:
  54                  #
  55                  # Convert the contents of each field to an entry in the table.
  56                  # These will all be unicode.
  57                  #
  58                  strRow += "<td>%s</td>" % str( oRow[strField])
  59  
  60              strRow += "</tr>"
  61              strRows.append( strRow)
  62      except:
  63          print "Failed on %s" % (strTable)
  64          traceback.print_exc()
  65          return
  66  
  67      #
  68      # Terminate the html.
  69      #
  70      strRows.append( "</table>")
  71      strRows.append( "</body>")
  72  
  73      return "\n".join( strRows)
  74  
  75  import BaseHTTPServer
  76  import webbrowser
  77  
  78  def LoadInDefaultBrowser(html):
  79      """Display html in the default web browser without creating a temp file.
  80  
  81      Instantiates a trivial http server and calls webbrowser.open with a URL
  82      to retrieve html from that server.
  83      """
  84  
  85      class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
  86          def do_GET(self):
  87              bufferSize = 1024*1024
  88              for i in xrange(0, len(html), bufferSize):
  89                  self.wfile.write(html[i:i+bufferSize])
  90  
  91      server = BaseHTTPServer.HTTPServer(('127.0.0.1', 0), RequestHandler)
  92      webbrowser.open('http://127.0.0.1:%s' % server.server_port)
  93      server.handle_request()
  94  
  95  import sys
  96  import odbc
  97  
  98  oConnection = odbc.odbc( sys.argv[1])
  99  LoadInDefaultBrowser( DumpDBTable( oConnection, sys.argv[2]))
Toggle Line Numbers

Filed under: odbc python

9 Comments

dvska Says:

over 4 years ago

It is cool :) You are omit ")" in strRow += "(th)%s(/th" % strField

Peter Says:

over 4 years ago

Oh yeah, thanks.

If anyone is interested, here is the latest version of the DBTable code. I have been using it on a few things and it has matured a little:

  • It now supports both odbc and MySQLdb connections: you just pass it the connection when you create the object.
  • I have added some convenience functions like FetchList and FetchMap to return lists and dictionarys of values
  • It can generate SQL IN clauses by passing lists of values instead of individual values in a query. e.g.
       1  #
       2  # Print rows in table 'tablename' where column 'id' = 1, 2 or 3.
       3  #
       4  o = odbc.connection( 'db name')
       5  oDB = DBTable.DBTable( o, 'tablename')
       6  oDB.Select( { 'id': ( 1,2,3)})
       7  while 1:
       8      oRow = oDB.FetchOne()
       9      if not oRow:
      10          break
      11      print oRow
    
    Toggle Line Numbers

Here is the code:

   1  #
   2  # Database wrapper class.
   3  #
   4  bVerbose = False
   5  
   6  class DBTable:
   7      """
   8      Wrapper for database table
   9      """
  10      FIELD_TYPE = 0
  11  
  12      oFieldCache = {}
  13  
  14      def __init__( self, oConnection, strTable):
  15          self.oConnection = oConnection
  16          self.strTable = strTable
  17  
  18          self.oFields = None
  19          self.oFieldDescription = None
  20  
  21      def PopulateFields( self, oCursor = None):
  22          """
  23          Populate field names and descriptions from a cursor.
  24          """
  25          #
  26          # Only populate them if they have not already been populated.
  27          #
  28          if self.oFields == None:
  29              #
  30              # If table schema is cached then use that
  31              #
  32              if DBTable.oFieldCache.has_key( self.strTable):
  33                  self.oFields, self.oFieldDescription = DBTable.oFieldCache[self.strTable]
  34                  return
  35  
  36              if oCursor == None:
  37                  #
  38                  # Cursor not specified, have to do a query here.
  39                  #
  40                  oCursor = self.oConnection.cursor()
  41                  oCursor.execute( "SELECT * FROM %s" % self.strTable)
  42  
  43              #
  44              # Get field definitions from the cursor.
  45              #
  46              self.oFields = [ oField[0] for oField in oCursor.description]
  47              self.oFieldDescription = dict( [ (oField[0], oField[1:]) for oField in oCursor.description])
  48  
  49              #
  50              # Add field definitions to the cache.
  51              #
  52              if not DBTable.oFieldCache.has_key( self.strTable):
  53                  DBTable.oFieldCache[ self.strTable] = ( self.oFields, self.oFieldDescription)
  54  
  55      def Select( self, strQuery = None):
  56          """
  57          Select records from query
  58  
  59          Takes either SQL of select statement or a dictionary containing field names and values to find.
  60          Additionally, the values in the dictionary can be a sequence of values which causes an SQL
  61          IN expression to be generated.
  62          """
  63          self.oCursor = self.oConnection.cursor()
  64          if strQuery == None:
  65              self.oCursor.execute( "SELECT * FROM %s WHERE 1" % (self.strTable))
  66          elif type( strQuery) == type(""):
  67              self.oCursor.execute( strQuery)
  68          else:
  69              #
  70              # assume query is a dict
  71              #
  72              self.oCursor.execute( "SELECT * FROM %s WHERE %s" % (self.strTable, self.DictToWhere( strQuery)))
  73  
  74          self.PopulateFields( self.oCursor)
  75  
  76      def FetchOne( self):
  77          """
  78          Get next row of results
  79          Returns a dictionary holding field names and values.
  80          """
  81          oRow = self.oCursor.fetchone()
  82          if oRow:
  83              """
  84              Build a dictionary to map field name->value
  85              """
  86              return dict([(self.oFields[i], oRow[i]) for i in range(len(oRow))])
  87          else:
  88              return None
  89  
  90      def FetchList( self, strField):
  91          """
  92          Fetch all results for a particular column.
  93          """
  94          nField = self.FieldNumber( strField)
  95          oRows = self.oCursor.fetchall()
  96          return [oRow[nField] for oRow in oRows]
  97  
  98      def FetchMap( self, strKey, strValue):
  99          """
 100          Return a map of key fields to value fields.
 101          """
 102          nKey = self.FieldNumber( strKey)
 103          nValue = self.FieldNumber( strValue)
 104          #
 105          # List comprehensions: gotta love the potential for unreadability.
 106          # If I practise enough I might be able to maintain a perl script.
 107          #
 108          return dict( [(oRow[nKey], oRow[nValue]) for oRow in self.oCursor.fetchall()])
 109  
 110      def Insert( self, oDict):
 111          """
 112          Insert a row in the database
 113          Takes a dictionary holding field names and values.
 114          """
 115          self.PopulateFields()
 116  
 117          strFields = oDict.keys()
 118          strValues = []
 119          for strField in strFields:
 120              strValue = oDict[strField]
 121              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 122  
 123              strValues.append( self.FormatField( strField, strValue))
 124  
 125          strSQL = "INSERT INTO %s ( %s) VALUES(%s);" % ( self.strTable, ", ".join( strFields), ", ".join( strValues))
 126          if bVerbose: print strSQL
 127          oCursor = self.oConnection.cursor()
 128          oCursor.execute( strSQL)
 129          self.oConnection.commit()
 130  
 131      def Update( self, oDictWhere, oDictNew):
 132          """
 133          Update a row in the database
 134          Takes a dictionary holding field names and values to find
 135          and dictionary to replace it with.
 136          """
 137          self.PopulateFields()
 138  
 139          strFields = oDictNew.keys()
 140          strValues = []
 141          for strField in strFields:
 142              strValue = oDictNew[strField]
 143              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 144  
 145              strValues.append( "%s = %s" % (strField, self.FormatField( strField, strValue)))
 146  
 147          strSQL = "UPDATE %s SET %s WHERE %s;" % ( self.strTable, ", ".join( strValues), self.DictToWhere( oDictWhere))
 148          if bVerbose: print strSQL
 149          oCursor = self.oConnection.cursor()
 150          oCursor.execute( strSQL)
 151          self.oConnection.commit()
 152  
 153      def InsertOrUpdate( self, oDictWhere, oDictNew):
 154          """
 155          Seek record in database, add it if not found, update it if found.
 156          """
 157          self.Select( oDictWhere)
 158          if self.FetchOne():
 159              self.Update( oDictWhere, oDictNew)
 160          else:
 161              self.Insert( oDictNew)
 162  
 163      def Delete( self, oDict):
 164          """
 165          Delete row based on dictionary contents
 166          Takes a dictionary holding field names and values.
 167          """
 168          strSQL = "DELETE FROM %s WHERE %s;" % ( self.strTable, self.DictToWhere( oDict))
 169          if bVerbose: print strSQL
 170          oCursor = self.oConnection.cursor()
 171          oCursor.execute( strSQL)
 172          self.oConnection.commit()
 173  
 174      def DictToWhere( self, oDict):
 175          """
 176          Convert dictionary to WHERE clause.
 177          """
 178          self.PopulateFields()
 179  
 180          strFields = oDict.keys()
 181          strExpressions = []
 182  
 183          for strField in strFields:
 184              strValue = oDict[strField]
 185              strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 186  
 187              if type( strValue) in (type(tuple()), type(list())):
 188                  #
 189                  # Value is a list or a tuple so build an IN expression
 190                  #
 191                  strTerms = [ self.FormatField( strField, strTerm) for strTerm in strValue]
 192  
 193                  strExpressions.append( '%s IN( %s)' % (strField, ", ".join( strTerms)))
 194              else:
 195                  strValue = self.FormatField( strField, strValue)
 196  
 197                  strExpressions.append( '%s = %s' % (strField, strValue))
 198  
 199          return " AND ".join( strExpressions)
 200  
 201      def FormatField( self, strField, strValue):
 202          """
 203          Format a field for an sql statement.
 204          """
 205          self.PopulateFields()
 206  
 207          strType = self.oFieldDescription[strField][DBTable.FIELD_TYPE]
 208          if strType == 'STRING':
 209              return "'%s'" % str(strValue).replace( "'", "''")
 210          elif strType == 'NUMBER' or strType == 3:
 211              return '%d' % int( strValue)
 212          else:
 213              #
 214              # Treat as a string.
 215              #
 216              return "'%s'" % str(strValue).replace( "'", "''")
 217  
 218      def FieldNumber( self, strField):
 219          """
 220          Return the index of the specified field in the results.
 221          """
 222          self.PopulateFields()
 223  
 224          return self.oFields.index( strField)
Toggle Line Numbers

Peter

RJackson Says:

over 4 years ago

This was extremely useful!! Thank you for publishing.

mordechai.malka@hds.com Says:

about 1 year ago

Hi Peter, I am relatively new with open system(I am a Mainframe SE), I would like to access in python the Oracle database of EMC Storage Array,I don't understand from you DBTable how to create the oConnection . According to EMC Manual I should use the service name of the DataBase RAMDBD, I have the IP Adress of the Oracle Server , the userID stsview and the password sts, can I use all thsi info to create the oConnection Variable of youR DBTable Class ? Many Thanks and Regards,

Peter Says:

about 1 year ago

Like this:

import odbc

o = odbc.odbc( '<odbc connectionname>')
c = o.cursor()
c.execute( "INSERT INTO Sequence (Description) VALUES( 'Blah')")

The details of odbc connection depend on your Oracle Backend. I have no experience with Oracle but for mysql this would specify the host, database, username and password.

Peter

Macbaden Says:

about 1 year ago

i'm new in python programming. can you help how to connect a simple mysql connection. I am using a IDLE GUI interface of python. and also i'm trying to deploy or dump python code into html is these possible sir?. i'm using these for a report. realtime monitoring engine for contact solutions

Peter Says:

about 1 year ago

If you want to generate html reports from a database then you should try a python web application framework like django or turbogears. These would have the pieces you need:

  • mysql access
  • html templating
  • glue logic

Peter

macbaden Says:

about 1 year ago

Thank you so much sir.

Realy appreciate it. this is my third day of programming in python. my main Programming language is VB 6 and SQL. I hope you can also teach me the console mode environment in programming informix database.

Peter Says:

about 1 year ago

Unfortunately I know nothing about Informix.

Peter

Sorry but comments on this post are now closed.