Skip to content
This repository was archived by the owner on Jun 23, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions src/com/google/enterprise/adaptor/database/DatabaseAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import static java.util.Locale.US;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.enterprise.adaptor.AbstractAdaptor;
import com.google.enterprise.adaptor.Acl;
import com.google.enterprise.adaptor.AdaptorContext;
Expand Down Expand Up @@ -78,6 +80,9 @@ public class DatabaseAdaptor extends AbstractAdaptor {
/** Map from SQL types to SQL type names. */
private static final HashMap<Integer, String> sqlTypeNames = new HashMap<>();

/** Cache for DocId and last modified date. */
private static Cache<DocId, Date> lastModifiedCache;

static {
for (Field field : Types.class.getFields()) {
if (field.getType() == int.class
Expand Down Expand Up @@ -136,6 +141,8 @@ private static boolean isNullOrEmptyString(String str) {
private boolean docIdIsUrl;
private String modeOfOperation;

private Calendar updateTimestampTimezone;

@Override
public void initConfig(Config config) {
config.addKey("db.driverClass", null);
Expand Down Expand Up @@ -292,6 +299,16 @@ public void init(AdaptorContext context) throws Exception {

String updateSql = cfg.getValue("db.updateSql");
String tzString = cfg.getValue("db.updateTimestampTimezone");

if (isNullOrEmptyString(tzString)) {
updateTimestampTimezone = Calendar.getInstance();
} else {
updateTimestampTimezone =
Calendar.getInstance(TimeZone.getTimeZone(tzString));
}
log.config("updateTimestampTimezone: "
+ updateTimestampTimezone.getTimeZone().getDisplayName());

if (!updateSql.isEmpty()) {
context.setPollingIncrementalLister(
new DbAdaptorIncrementalLister(updateSql, tzString));
Expand Down Expand Up @@ -353,6 +370,10 @@ public void init(AdaptorContext context) throws Exception {

uniqueKey = ukBuilder.build();
log.config("primary key: " + uniqueKey);

lastModifiedCache =
CacheBuilder.newBuilder().initialCapacity(10000).maximumSize(1000000L)
.build();
}

/** A SQL data type, with a type code and type name. */
Expand Down Expand Up @@ -466,14 +487,19 @@ public void getDocIds(DocIdPusher pusher) throws IOException,
if (everyDocIdSql.isEmpty()) {
throw new IOException("db.everyDocIdSql cannot be an empty string");
}
lastModifiedCache.invalidateAll();
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z");
formatter.setTimeZone(updateTimestampTimezone.getTimeZone());
BufferedPusher outstream = new BufferedPusher(pusher);
try (Connection conn = makeNewConnection();
PreparedStatement stmt = getStreamFromDb(conn, everyDocIdSql);
ResultSet rs = stmt.executeQuery()) {
log.finer("queried for stream");
boolean hasAction
= hasColumn(rs.getMetaData(), GsaSpecialColumns.GSA_ACTION);
ResultSetMetaData rsmd = rs.getMetaData();
boolean hasAction = hasColumn(rsmd, GsaSpecialColumns.GSA_ACTION);
log.log(Level.FINEST, "Has GSA_ACTION column: {0}", hasAction);
boolean hasTimestamp = hasColumn(rsmd, GsaSpecialColumns.GSA_TIMESTAMP);
log.log(Level.FINEST, "Has GSA_TIMESTAMP column: {0}", hasTimestamp);
while (rs.next()) {
DocId id;
try {
Expand All @@ -483,6 +509,18 @@ public void getDocIds(DocIdPusher pusher) throws IOException,
+ e.getMessage());
continue;
}
// Cache last modified time stamp for this DocId
if (hasTimestamp) {
Timestamp ts =
rs.getTimestamp(GsaSpecialColumns.GSA_TIMESTAMP.toString(),
updateTimestampTimezone);
if (ts != null) {
Date lastModified = new Date(ts.getTime());
lastModifiedCache.put(id, lastModified);
log.log(Level.FINE, "lastModifiedCache updated: {0}",
formatter.format(lastModified));
}
}
DocIdPusher.Record.Builder builder = new DocIdPusher.Record.Builder(id);
if (hasAction && isDeleteAction(rs)) {
builder.setDeleteFromIndex(true);
Expand Down Expand Up @@ -674,6 +712,14 @@ public void getDocContent(Request req, Response resp) throws IOException {
return;
}
DocId id = req.getDocId();
// Check if modified since last access.
Date cachedLastModified = lastModifiedCache.getIfPresent(id);
if (cachedLastModified != null
&& !req.hasChangedSinceLastAccess(cachedLastModified)) {
log.log(Level.FINE, "Content not modified since last crawl: {0}", id);
resp.respondNotModified();
return;
}
try (Connection conn = makeNewConnection();
PreparedStatement stmt = getDocFromDb(conn, id.getUniqueId());
ResultSet rs = stmt.executeQuery()) {
Expand All @@ -684,6 +730,7 @@ public void getDocContent(Request req, Response resp) throws IOException {
resp.respondNotFound();
return;
}
log.log(Level.FINE, "Content modified since last crawl: {0}", id);
// Generate response metadata first.
addMetadataToResponse(resp, rs);
// Generate Acl if aclSql is provided.
Expand Down Expand Up @@ -985,29 +1032,19 @@ private static String getResponseGeneratorMethods() {
// next full push to be sent to GSA.
private class DbAdaptorIncrementalLister implements PollingIncrementalLister {
private final String updateSql;
private final Calendar updateTimestampTimezone;
private Timestamp lastUpdateTimestamp;
private final DateFormat formatter;

public DbAdaptorIncrementalLister(String updateSql, String tzString) {
this.updateSql = updateSql;
if (isNullOrEmptyString(tzString)) {
updateTimestampTimezone = Calendar.getInstance();
} else {
updateTimestampTimezone =
Calendar.getInstance(TimeZone.getTimeZone(tzString));
}
log.config("update sql: " + this.updateSql);
log.config("updateTimestampTimezone: "
+ updateTimestampTimezone.getTimeZone().getDisplayName());
this.lastUpdateTimestamp = new Timestamp(System.currentTimeMillis());
formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z");
formatter.setTimeZone(updateTimestampTimezone.getTimeZone());
}

@Override
public void getModifiedDocIds(DocIdPusher pusher)
throws IOException, InterruptedException {
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS Z");
formatter.setTimeZone(updateTimestampTimezone.getTimeZone());
BufferedPusher outstream = new BufferedPusher(pusher);
// latestTimestamp will be used to update lastUpdateTimestamp
// if GSA_TIMESTAMP column is present in the ResultSet and there is at
Expand Down Expand Up @@ -1042,17 +1079,21 @@ public void getModifiedDocIds(DocIdPusher pusher)
log.log(Level.FINEST, "doc id: {0}", id);
outstream.add(record);

// update latestTimestamp
// update time stamps
if (hasTimestamp) {
Timestamp ts =
rs.getTimestamp(GsaSpecialColumns.GSA_TIMESTAMP.toString(),
updateTimestampTimezone);
if (ts != null) {
Date lastModified = new Date(ts.getTime());
if (latestTimestamp == null || ts.after(latestTimestamp)) {
latestTimestamp = ts;
log.log(Level.FINE, "latestTimestamp updated: {0}",
formatter.format(new Date(latestTimestamp.getTime())));
formatter.format(lastModified));
}
lastModifiedCache.put(id, lastModified);
log.log(Level.FINE, "lastModifiedCache updated: {0}",
formatter.format(lastModified));
}
}
}
Expand Down
180 changes: 180 additions & 0 deletions test/com/google/enterprise/adaptor/database/DatabaseAdaptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,186 @@ public void testGetDocContent_sqlException() throws Exception {
adaptor.getDocContent(request, response);
}

private Timestamp now() {
return new Timestamp(System.currentTimeMillis());
}

private Timestamp nowPlusMinutes(int minutes) {
long millis = System.currentTimeMillis() + MINUTES.toMillis(minutes);
return new Timestamp(millis);
}

private Map<String, String> initDataAndConfig(String content,
Timestamp modifyTs) throws SQLException {
executeUpdate("create table data(id integer, content varchar(20), "
+ "gsa_timestamp timestamp)");
executeUpdate("insert into data(id, content, gsa_timestamp) values(1, '"
+ content + "', {ts '" + modifyTs + "'})");

Map<String, String> configEntries = new HashMap<String, String>();
configEntries.put("db.uniqueKey", "id:int");
configEntries.put("db.everyDocIdSql", "select * from data");
configEntries.put("db.singleDocContentSql",
"select * from data where id = ?");
configEntries.put("db.modeOfOperation", "contentColumn");
configEntries.put("db.modeOfOperation.contentColumn.columnName", "content");
return configEntries;
}

@Test
public void testIfModifiedSince_null() throws Exception {
String content = "Hello";
Map<String, String> configEntries = initDataAndConfig(content, now());

DatabaseAdaptor adaptor = getObjectUnderTest(configEntries);
DocRequest request = new DocRequest(new DocId("1"), null);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RecordingResponse response = new RecordingResponse(baos);
adaptor.getDocContent(request, response);
assertEquals(content, baos.toString(UTF_8.toString()));
}

private Response testIfModifiedSince_getDocIds(String content,
Timestamp modifyTs, Timestamp lastAccessTs) throws Exception {
Map<String, String> configEntries = initDataAndConfig(content, modifyTs);

DatabaseAdaptor adaptor = getObjectUnderTest(configEntries);
RecordingDocIdPusher pusher = new RecordingDocIdPusher();
adaptor.getDocIds(pusher);
DocRequest request =
new DocRequest(new DocId("1"), (lastAccessTs == null ? null
: new Date(lastAccessTs.getTime())));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RecordingResponse response = new RecordingResponse(baos);
adaptor.getDocContent(request, response);
return response;
}

@Test
public void testIfModifiedSince_getDocIds_lastAccess_null() throws Exception {
String content = "Hello";
Response response = testIfModifiedSince_getDocIds(content, now(), null);
ByteArrayOutputStream baos =
(ByteArrayOutputStream) response.getOutputStream();
assertEquals(content, baos.toString(UTF_8.toString()));
}

@Test
public void testIfModifiedSince_getDocIds_lastAccess_later()
throws Exception {
RecordingResponse response =
(RecordingResponse) testIfModifiedSince_getDocIds("Hello", now(),
nowPlusMinutes(1));
assertEquals(RecordingResponse.State.NOT_MODIFIED, response.getState());
}

@Test
public void testIfModifiedSince_getDocIds_lastAccess_earlier()
throws Exception {
String content = "Hello";
Response response =
testIfModifiedSince_getDocIds(content, nowPlusMinutes(1), now());
ByteArrayOutputStream baos =
(ByteArrayOutputStream) response.getOutputStream();
assertEquals(content, baos.toString(UTF_8.toString()));
}

private Response testIfModifiedSince_getModifiedDocIds(String content,
Timestamp modifyTs, Date lastAccessTs) throws Exception {
Map<String, String> configEntries = initDataAndConfig(content, modifyTs);
configEntries.put("db.updateSql",
"select * from data where gsa_timestamp >= ? order by id");

Holder<RecordingContext> contextHolder = new Holder<>();
DatabaseAdaptor adaptor = getObjectUnderTest(configEntries, contextHolder);
PollingIncrementalLister lister =
contextHolder.get().getPollingIncrementalLister();
RecordingDocIdPusher pusher = new RecordingDocIdPusher();
lister.getModifiedDocIds(pusher);

DocRequest request =
new DocRequest(new DocId("1"), (lastAccessTs == null ? null
: new Date(lastAccessTs.getTime())));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RecordingResponse response = new RecordingResponse(baos);
adaptor.getDocContent(request, response);
return response;
}

@Test
public void testIfModifiedSince_getModifiedDocIds_lastAccess_null()
throws Exception {
String content = "Hello";
Response response =
testIfModifiedSince_getModifiedDocIds(content, nowPlusMinutes(1), null);
ByteArrayOutputStream baos =
(ByteArrayOutputStream) response.getOutputStream();
assertEquals(content, baos.toString(UTF_8.toString()));
}

@Test
public void testIfModifiedSince_getModifiedDocIds_lastAccess_later()
throws Exception {
RecordingResponse response =
(RecordingResponse) testIfModifiedSince_getModifiedDocIds("Hello",
nowPlusMinutes(1), nowPlusMinutes(2));
assertEquals(RecordingResponse.State.NOT_MODIFIED, response.getState());
}

@Test
public void testIfModifiedSince_getModifiedDocIds_lastAccess_earlier()
throws Exception {
String content = "Hello";
Response response = testIfModifiedSince_getModifiedDocIds(content,
nowPlusMinutes(1), now());
ByteArrayOutputStream baos =
(ByteArrayOutputStream) response.getOutputStream();
assertEquals(content, baos.toString(UTF_8.toString()));
}

@Test
public void testIfModifiedSince_getDocIdsDelDoc() throws Exception {
String content = "Hello";
Timestamp modifyTs = now();
Timestamp lastAccessTs = now();
Map<String, String> configEntries = initDataAndConfig(content, modifyTs);

DatabaseAdaptor adaptor = getObjectUnderTest(configEntries);
RecordingDocIdPusher pusher = new RecordingDocIdPusher();
adaptor.getDocIds(pusher);

executeUpdate("delete from data where content = '" + content + "'");

DocRequest request =
new DocRequest(new DocId("1"), new Date(lastAccessTs.getTime()));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RecordingResponse response = new RecordingResponse(baos);
adaptor.getDocContent(request, response);
assertEquals(RecordingResponse.State.NOT_MODIFIED, response.getState());
}

@Test
public void testIfModifiedSince_getDocIdsDelDocGetDocIds() throws Exception {
String content = "Hello";
Timestamp modifyTs = now();
Timestamp lastAccessTs = now();
Map<String, String> configEntries = initDataAndConfig(content, modifyTs);

DatabaseAdaptor adaptor = getObjectUnderTest(configEntries);
RecordingDocIdPusher pusher = new RecordingDocIdPusher();
adaptor.getDocIds(pusher);

executeUpdate("delete from data where content = '" + content + "'");
adaptor.getDocIds(pusher);

DocRequest request =
new DocRequest(new DocId("1"), new Date(lastAccessTs.getTime()));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
RecordingResponse response = new RecordingResponse(baos);
adaptor.getDocContent(request, response);
assertEquals(RecordingResponse.State.NOT_FOUND, response.getState());
}

/** @see testMetadataColumns(int, String, Object, String) */
private void testMetadataColumns(int sqlType, Object input, String output)
throws Exception {
Expand Down