Skip to content
This repository was archived by the owner on Feb 25, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions tools/import/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name := "import-tools"

organization := "org.precog"

version := "0.1"

scalaVersion := "2.9.2"


scalacOptions ++= Seq("-unchecked", "-deprecation")

18 changes: 18 additions & 0 deletions tools/import/common/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name := "import-common"

organization := "org.precog"

version := "0.1"

scalaVersion := "2.9.2"

resolvers ++= Seq(
"ReportGrid (public)" at "http://nexus.reportgrid.com/content/repositories/public-releases"
)

libraryDependencies ++= Seq(
"com.reportgrid" %% "blueeyes-core" % "1.0.0-M8.1",
"com.reportgrid" %% "blueeyes-json" % "1.0.0-M8.1"
)


5 changes: 5 additions & 0 deletions tools/import/common/project/Build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import sbt._
object CommonProj extends Build
{
lazy val root = Project("import-common", file("."))
}
1 change: 1 addition & 0 deletions tools/import/common/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7")
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.precog.tools.importers.common

import annotation.tailrec

/**
* User: gabriel
* Date: 1/25/13
*/
object ConsoleUtils {

@tailrec
def selectSet[T](label:String, available: Seq[T], selected: Seq[T]=List()): Seq[T] =
if (available.isEmpty) selected
else {

println("Available %ss:".format(label))
println(present(available))

println("Selected %ss:".format(label))
println(present(selected))

println("Select %ss by entering its number or name, 0 to select all, enter to continue: ".format(label))

val selIdx = readLine()
selIdx match {
case "" => selected
case ParseInt(0) => available
case ParseInt(x) if x<=available.size => {
val elem:T = available(x - 1)
selectSet(label,available.filterNot(_==elem), selected:+elem)
}
case s:String if (available.exists(_.toString == s)) => {
val elem:T =available.find(_.toString == s).get
selectSet(label,available.filterNot(_==elem), selected:+elem)
}
case _ => selectSet(label,available, selected)
}
}

@tailrec
def selectOne[T](label:String, available: Seq[T]): T ={

println("Available %ss:".format(label))
println(present(available))

println("Select one %s by entering its number or name: ".format(label))

val selIdx = readLine()
selIdx match {
case ParseInt(x) if x<=available.size => available(x - 1)
case s:String => available.find(_.toString == s) match {
case Some(t) => t
case None => selectOne(label,available)
}
case _ => selectOne(label,available)
}
}

def present[T](arr:Seq[T])= arr.zipWithIndex.map({ case (a, b) => (b+1) + ":" + a }).mkString(", ")


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.precog.tools.importers.common

import akka.dispatch._
import akka.dispatch.Future
import scalaz.Monad

import blueeyes.json._
import blueeyes.core.data.DefaultBijections._
import blueeyes.core.service._
import blueeyes.core.service.engines.HttpClientXLightWeb
import blueeyes.bkka.FutureMonad
import scalaz.StreamT
import java.nio.ByteBuffer
import blueeyes.core.http.HttpResponse
import blueeyes.core.data.ByteChunk
import scala.Right
import org.slf4j.LoggerFactory

/**
* User: gabriel
* Date: 3/21/13
*/
object Ingest {

private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.Ingest")

def sendToPrecog(host:String, path:String, apiKey:String, dataStream:StreamT[Future,ByteBuffer], streaming:Boolean = true)(implicit executor:ExecutionContext): Future[HttpResponse[ByteChunk]] = {
implicit val M = new FutureMonad(executor)
val httpClient = new HttpClientXLightWeb()(executor)

dataStream.isEmpty.flatMap( isEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wtf, as seen on #precog-internals. :-)

if (isEmpty) {
logger.info("No need to send empty data stream")
Future(HttpResponse.empty)
} else {
val byteChunks: ByteChunk = Right(dataStream)
//val fullPath = "%s/ingest/v1/fs%s".format(host, path)
val fullPath = "%s/fs%s".format(host, path) //local test only
val ingestParams = ('apiKey -> apiKey)::( if (streaming) List('mode -> "streaming") else List('mode -> "batch", 'receipt -> "true"))
logger.info("Ingesting to %s".format(path))
httpClient.parameters(ingestParams:_*).header("Content-Type","application/json").post(fullPath)(byteChunks)
}
)
}

def callSucceded(response:HttpResponse[ByteChunk]){
response match {
case HttpResponse(_ ,_,Some(Left(buffer)),_) => logger.info("Result: %s".format(new String(buffer.array(), "UTF-8")))
case _ => logger.error("Unexpected stream in %s".format(response))
}
}


def toByteStream(dataStream: StreamT[Future, JValue])(implicit m:Monad[Future]): StreamT[Future, ByteBuffer] = {
dataStream.map(jv => ByteBuffer.wrap({
val js = "%s\n".format(jv.renderCompact)
logger.trace("to bytes = %s".format(js.replace('\n',' ')))
js
}.getBytes("UTF-8")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.precog.tools.importers

/**
* User: gabriel
* Date: 1/25/13
*/
package object common {

object ParseInt{
def unapply(s : String) : Option[Int] = try {
Some(s.toInt)
} catch {
case _ : java.lang.NumberFormatException => None
}
}


}
11 changes: 8 additions & 3 deletions tools/import/jdbc/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
name := "import-jdbc"

organization := "org.precog"

version := "0.1"

scalaVersion := "2.9.2"
Expand All @@ -17,9 +19,12 @@ resolvers ++= Seq(

scalacOptions ++= Seq("-unchecked", "-deprecation")

assemblySettings

libraryDependencies ++= Seq(
"com.reportgrid" % "blueeyes-core_2.9.2" % "1.0.0-M6",
"com.reportgrid" % "blueeyes-json_2.9.2" % "1.0.0-M6",
"com.reportgrid" % "blueeyes-core_2.9.2" % "1.0.0-M8.1",
"com.reportgrid" % "blueeyes-json_2.9.2" % "1.0.0-M8.1",
"org.specs2" %% "specs2" % "1.12.2" ,
"com.h2database" % "h2" % "1.2.134" % "test"
"com.h2database" % "h2" % "1.2.134" % "test",
"ch.qos.logback" % "logback-classic" % "1.0.0"
)
9 changes: 9 additions & 0 deletions tools/import/jdbc/project/Build.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import sbt._
object JdbcImportProj extends Build
{
lazy val root =
Project("import-jdbc", file(".")) dependsOn(common)
lazy val common =
ProjectRef(uri("../common/"), "import-common")
}

1 change: 1 addition & 0 deletions tools/import/jdbc/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.7")
8 changes: 8 additions & 0 deletions tools/import/jdbc/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
blueeyes-async {
name = "DefaultActorPool"
keep-alive-time = 5s
core-pool-size-factor = 1.0
core-pool-size-max = 8
max-pool-size-factor = 1.0
max-pool-size-max = 8
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package com.precog.tools.importers.jdbc

import java.sql._
import Datatypes._
import scalaz.StreamT
import annotation.tailrec
import scalaz.effect.IO
import org.slf4j.LoggerFactory

/**
* User: gabriel
* Date: 11/30/12
*/
object DbAccess {

private lazy val logger = LoggerFactory.getLogger("com.precog.tools.importers.jdbc.DbAccess")

def columnCount(stmt:PreparedStatement)=stmt.getMetaData.getColumnCount

def getColumns(conn:Connection, table:Table):IndexedSeq[Column]={
Expand All @@ -26,24 +33,31 @@ object DbAccess {
for ( i <- 1 to count) yield Column(tblMetaData.getColumnName(i),Table(tblMetaData.getTableName(i)))
}

def rsIterator[T](rs:ResultSet)(f:ResultSet => T) = new Iterator[T] {
def hasNext = rs.next()
def next():T = f(rs)


def rsList[T](rs:ResultSet)(f:ResultSet => T)={

@tailrec
def buildList(rs:ResultSet, acc:List[T]=Nil):List[T]=
if (rs.next()) buildList(rs, f(rs)::acc)
else acc

buildList(rs).reverse
}

def oneColumnRs(rs:ResultSet) = rsIterator(rs)(rs=> rs.getString(1))
def tables(rs:ResultSet) = rsIterator(rs)(rs=> Table(rs.getString("TABLE_NAME")))
def columns(rs:ResultSet) = rsIterator(rs)(rs=> Column(rs.getString("COLUMN_NAME"), Table(rs.getString("TABLE_NAME"))))
def relationshipDesc(rs:ResultSet) = rsIterator(rs)(
def rsStreamT[T](rs:ResultSet)(f:ResultSet => T)=StreamT.unfoldM(rs)(
(rs:ResultSet) => IO( { val d=if (rs.next()) { Some(f(rs),rs)} else None; logger.info("read stream = %s".format(d)); d }))

def rsStream[T](rs:ResultSet)(f:ResultSet => T):Stream[T] = if (rs.next) f(rs) #:: rsStream(rs)(f) else Stream.empty

def oneColumnRs(rs:ResultSet) =rsList(rs)(rs=> rs.getString(1))
def tables(rs:ResultSet) = rsList(rs)(rs=> Table(rs.getString("TABLE_NAME")))
def columns(rs:ResultSet) = rsList(rs)(rs=> Column(rs.getString("COLUMN_NAME"), Table(rs.getString("TABLE_NAME"))))
def relationshipDesc(rs:ResultSet) = rsList(rs)(
rs=> PkFkRelation(
Key(Table(rs.getString("PKTABLE_NAME")),rs.getString("PKCOLUMN_NAME")),
Key(Table(rs.getString("FKTABLE_NAME")),rs.getString("FKCOLUMN_NAME")),
rs.getInt("KEY_SEQ")
)
)

def allSet(rs:ResultSet) = {
val count= rs.getMetaData.getColumnCount
rsIterator(rs)(rs=> for ( i <- 1 to count) yield rs.getString(i) )
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object DbAnalysis{


def findTables(metadata: DatabaseMetaData, oCat: Option[String], tableName: => Option[String]): Array[Table] = {
val cat= toNullUppercase(oCat)
val cat= oCat.getOrElse(null)
val tableNm= tableName.map(_.toUpperCase).getOrElse(null)
tables(metadata.getTables(cat, null, tableNm, Array("TABLE"))).toArray
}
Expand Down
Loading