在 Oracle 数据库中实现 MapReduce(2)

CREATE TABLE documents (a CLOB)
  LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);
 
INSERT INTO documents VALUES ('abc def');
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;
 
create or replace
package oracle_map_reduce is
 
  type word_t    is record (word varchar2(4000));
  type words_t    is table of word_t;
 
  type word_cur_t is ref cursor return word_t;
  type wordcnt_t  is record (word varchar2(4000), count number);
  type wordcnts_t is table of wordcnt_t;
 
  function mapper(doc in sys_refcursor, sep in varchar2) return words_t
    pipelined parallel_enable (partition doc by any);
 
  function reducer(in_cur in word_cur_t) return wordcnts_t
    pipelined parallel_enable (partition in_cur by hash(word))
    cluster in_cur by (word);
 
end;
/
 
create or replace
package body oracle_map_reduce is
 
  --
  -- The mapper is a simple tokenizer that tokenizes the input documents
  -- and emits individual words
  --
  function mapper(doc in sys_refcursor, sep in varchar2) return words_t
    pipelined parallel_enable (partition doc by any)
  is
    document clob;
    istart  number;
    pos      number;
    len      number;
    word_rec word_t;
  begin
 
    -- for every document
    loop
 
      fetch doc into document;
      exit when doc%notfound;
 
      istart := 1;
      len := length(document);
 
      -- For every word within a document
      while (istart <= len) loop
        pos := instr(document, sep, istart);
 
        if (pos = 0) then
          word_rec.word := substr(document, istart);
          pipe row (word_rec);
          istart := len + 1;
        else
          word_rec.word := substr(document, istart, pos - istart);
          pipe row (word_rec);
          istart := pos + 1;
        end if;
 
      end loop; -- end loop for a single document
 
    end loop; -- end loop for all documents
 
    return;
 
  end mapper;
 
  --
  -- The reducer emits words and the number of times they're seen
  --
  function reducer(in_cur in word_cur_t) return wordcnts_t
    pipelined parallel_enable (partition in_cur by hash(word))
    cluster in_cur by (word)
  is
    word_count wordcnt_t;
    next      varchar2(4000);
  begin
 
    word_count.count := 0;
 
    loop
 
      fetch in_cur into next;
      exit when in_cur%notfound;
 
      if (word_count.word is null) then
 
        word_count.word := next;
        word_count.count := word_count.count + 1;
 
      elsif (next <> word_count.word) then
 
        pipe row (word_count);
        word_count.word := next;
        word_count.count := 1;
 
      else
 
        word_count.count := word_count.count + 1;
 
      end if;
 
    end loop;
 
    if word_count.count <> 0 then
      pipe row (word_count);
    end if;
 
    return;
 
  end reducer;
 
end;
/
 
 
-- Select statements
 
select word, count(*)
 from (
        select value(map_result).word word
        from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result)
group by (word);
 
select *
 from table(oracle_map_reduce.reducer(
              cursor(select value(map_result).word word
                      from table(oracle_map_reduce.mapper(
                        cursor(select a from documents), ' ')) map_result)));

英文原文:In-Database MapReduce (Map-Reduce)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/6dfbdc0d9be9cce0fd7f68dc481ca686.html