2010年4月9日金曜日

Blobstoreのデータをアプリケーションで利用する

Google App Engine SDK 1.3.2からはBlobstoreService APIにfetchDataというメソッドが追加されています。 これまではBlobstoreにデータをアップロードしても、アプリケーションの中から中身を見ることができなかったのですが、これを使うとそれができるようになる様子です。

ただしこのメソッド、1回に1MBずつしか転送できなかったりして面倒なので、InputStreamでラップしてみました。軽く実験したところ10MBのファイルを1秒程度で読み出せたりするなど、妙に優秀なので何か間違っているんじゃないかと不安になってます。

以下、ラップしたプログラムです。+expandとかで全部見えると思います。

package com.example;

import java.io.IOException;
import java.io.InputStream;
import java.text.MessageFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.appengine.api.blobstore.BlobInfo;
import com.google.appengine.api.blobstore.BlobInfoFactory;
import com.google.appengine.api.blobstore.BlobKey;
import com.google.appengine.api.blobstore.BlobstoreService;
import com.google.appengine.api.blobstore.BlobstoreServiceFactory;

/**
* Blobの中身を読み出すストリーム。
*/
public class BlobInputStream extends InputStream {

private static final Logger LOG = LoggerFactory.getLogger(
    BlobInputStream.class);

private transient BlobstoreService service;

private final BlobInfo info;

private final int chunkSize;

private long positionInBlob;

private transient int positionInChunk;

private transient byte[] chunk;

private boolean closed;

/**
 * インスタンスを生成する。
 * @param key 読みだす対象のBlobへのキー
 * @param chunkSize 分割して読みだす個々のチャンクのバイト数
 * @throws IOException Blobが存在しない場合や、Blobのメタ情報取得に失敗した場合
 * @throws IllegalArgumentException 引数に{@code null}が含まれる場合、
 *     または{@code chunkSize}に0以下の値が指定された場合
 */
public BlobInputStream(BlobKey key, int chunkSize) throws IOException {
    if (key == null) {
        throw new IllegalArgumentException("key is null"); //$NON-NLS-1$
    }
    if (chunkSize <= 0) {
        throw new IllegalArgumentException("chunkSize <= 0"); //$NON-NLS-1$
    }
    this.service = BlobstoreServiceFactory.getBlobstoreService();
    this.info = new BlobInfoFactory().loadBlobInfo(key);
    if (info == null) {
        throw new IOException(MessageFormat.format(
            "BlobInfo({0}) is not found",
            key.toString()));
    }
    this.chunkSize = Math.min(chunkSize, BlobstoreService.MAX_BLOB_FETCH_SIZE);
    this.positionInBlob = 0;
    this.positionInChunk = 0;
    this.chunk = null;
    this.closed = false;
}

/**
 * インスタンスを生成する。
 * @param info 読みだす対象のBlobメタ情報
 * @param chunkSize 分割して読みだす個々のチャンクのバイト数
 * @throws IllegalArgumentException 引数に{@code null}が含まれる場合、
 *     または{@code chunkSize}に0以下の値が指定された場合
 */
public BlobInputStream(BlobInfo info, int chunkSize) {
    if (info == null) {
        throw new IllegalArgumentException("info is null"); //$NON-NLS-1$
    }
    if (chunkSize <= 0) {
        throw new IllegalArgumentException("chunkSize <= 0"); //$NON-NLS-1$
    }
    this.service = BlobstoreServiceFactory.getBlobstoreService();
    this.info = info;
    this.chunkSize = Math.min(chunkSize, BlobstoreService.MAX_BLOB_FETCH_SIZE);
    this.positionInBlob = 0;
    this.positionInChunk = 0;
    this.chunk = null;
    this.closed = false;
}

/**
 * インスタンスを生成する。
 * @param service 実際にBlobの内容を読み出すサービス
 * @param info 読みだす対象のBlobメタ情報
 * @param chunkSize 分割して読みだす個々のチャンクのバイト数
 * @throws IllegalArgumentException 引数に{@code null}が含まれる場合、
 *     または{@code chunkSize}に0以下の値が指定された場合
 */
BlobInputStream(BlobstoreService service, BlobInfo info, int chunkSize) {
    if (info == null) {
        throw new IllegalArgumentException("info is null"); //$NON-NLS-1$
    }
    if (chunkSize <= 0) {
        throw new IllegalArgumentException("chunkSize <= 0"); //$NON-NLS-1$
    }
    this.service = service;
    this.info = info;
    this.chunkSize = Math.min(chunkSize, BlobstoreService.MAX_BLOB_FETCH_SIZE);
    this.positionInBlob = 0;
    this.positionInChunk = 0;
    this.chunk = null;
    this.closed = false;
}

@Override
public int read() throws IOException {
    checkAvailable();
    if (restInBlob() == 0) {
        return -1;
    }
    prepareChunk();
    int c = chunk[positionInChunk] & 0xff;
    advance(1);
    return c;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
    checkAvailable();
    if (b == null) {
        throw new IllegalArgumentException("b is null"); //$NON-NLS-1$
    }
    if (len == 0) {
        return 0;
    }
    if (restInBlob() == 0) {
        return -1;
    }
    int read = (int) Math.min(restInBlob(), len);
    int cursor = off;
    int rest = read;
    while (rest > 0) {
        assert restInBlob() > 0;
        prepareChunk();
        int readInChunk = Math.min(rest, restInChunk());
        System.arraycopy(
            chunk, positionInChunk,
            b, cursor,
            readInChunk);
        advance(readInChunk);
        rest -= readInChunk;
        cursor += readInChunk;
    }
    assert rest == 0;
    return read;
}

@Override
public int available() throws IOException {
    checkAvailable();
    return restInChunk();
}

@Override
public long skip(long n) throws IOException {
    checkAvailable();
    if (n < 0) {
        throw new IllegalArgumentException();
    }
    if (n == 0) {
        return 0;
    }
    long skip = Math.min(n, restInBlob());
    advance(skip);
    return skip;
}

@Override
public void close() throws IOException {
    chunk = null;
    closed = true;
}

private void advance(long count) {
    assert count <= restInBlob();
    positionInBlob += count;
    if (chunk != null) {
        if (count < restInChunk()) {
            positionInChunk += (int) count;
        }
        else {
            chunk = null;
            positionInChunk = 0;
        }
    }
}

private void prepareChunk() throws IOException {
    assert restInBlob() > 0;
    if (chunk != null) {
        assert positionInChunk < chunk.length;
        return;
    }
    long endIndex = Math.min(info.getSize(), positionInBlob + chunkSize);
    LOG.debug("Preparing chunk: name={}, size={}, range=[{}, {})", new Object[] {
            info.getFilename(),
            info.getSize(),
            positionInBlob,
            endIndex
    });
    try {
        chunk = service.fetchData(
            info.getBlobKey(),
            positionInBlob, endIndex);
        positionInChunk = 0;
    }
    catch (Exception e) {
        throw new IOException(e);
    }
    assert chunk.length > 0 : positionInBlob;
}

private int restInChunk() {
    if (chunk == null) {
        return 0;
    }
    return chunk.length - positionInChunk;
}

private long restInBlob() {
    return info.getSize() - positionInBlob;
}

private void checkAvailable() throws IOException {
    if (closed) {
        throw new IOException(MessageFormat.format(
            "{0} for {1} is already closed",
            getClass().getSimpleName(),
            info.getBlobKey()));
    }
}
}
追記 (2011.01.14 KATO)
1.3.5以降からBlobstoreInputStreamが提供されていますので、そちらをお使いください。

0 件のコメント:

コメントを投稿